blob: 640703b78bbe946f99dc79c4f2e456c1901be5a7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System.Buffers;
using System.Buffers.Binary;
using System.Text;
using Apache.Iggy.ConnectionStream;
using Apache.Iggy.Contracts;
using Apache.Iggy.Contracts.Auth;
using Apache.Iggy.Contracts.Tcp;
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
using Apache.Iggy.Kinds;
using Apache.Iggy.Mappers;
using Apache.Iggy.Messages;
using Apache.Iggy.Utils;
using Microsoft.Extensions.Logging;
using Partitioning = Apache.Iggy.Kinds.Partitioning;
namespace Apache.Iggy.IggyClient.Implementations;
public sealed class TcpMessageStream : IIggyClient, IDisposable
{
private readonly ILogger<TcpMessageStream> _logger;
private readonly SemaphoreSlim _semaphore;
private readonly IConnectionStream _stream;
internal TcpMessageStream(IConnectionStream stream, ILoggerFactory loggerFactory)
{
_stream = stream;
_logger = loggerFactory.CreateLogger<TcpMessageStream>();
_semaphore = new SemaphoreSlim(1, 1);
}
public void Dispose()
{
_stream.Close();
_stream.Dispose();
_semaphore.Dispose();
}
public async Task<StreamResponse?> CreateStreamAsync(string name, uint? streamId = null,
CancellationToken token = default)
{
var message = TcpContracts.CreateStream(name, streamId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_STREAM_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
throw new InvalidResponseException("Received empty response while trying to create stream.");
}
return BinaryMapper.MapStream(responseBuffer);
}
public async Task<StreamResponse?> GetStreamByIdAsync(Identifier streamId, CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STREAM_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapStream(responseBuffer);
}
public async Task<IReadOnlyList<StreamResponse>> GetStreamsAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STREAMS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return [];
}
return BinaryMapper.MapStreams(responseBuffer);
}
public async Task UpdateStreamAsync(Identifier streamId, string name, CancellationToken token = default)
{
var message = TcpContracts.UpdateStream(streamId, name);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_STREAM_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task PurgeStreamAsync(Identifier streamId, CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.PURGE_STREAM_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task DeleteStreamAsync(Identifier streamId, CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_STREAM_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<TopicResponse>> GetTopicsAsync(Identifier streamId,
CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_TOPICS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return [];
}
return BinaryMapper.MapTopics(responseBuffer);
}
public async Task<TopicResponse?> GetTopicByIdAsync(Identifier streamId, Identifier topicId,
CancellationToken token = default)
{
var message = TcpContracts.GetTopicById(streamId, topicId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_TOPIC_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapTopic(responseBuffer);
}
public async Task<TopicResponse?> CreateTopicAsync(Identifier streamId, string name, uint partitionsCount,
CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None,
uint? topicId = null, byte? replicationFactor = null, ulong messageExpiry = 0, ulong maxTopicSize = 0,
CancellationToken token = default)
{
var message = TcpContracts.CreateTopic(streamId, name, partitionsCount, compressionAlgorithm, topicId,
replicationFactor, messageExpiry, maxTopicSize);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_TOPIC_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapTopic(responseBuffer);
}
public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, string name,
CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None,
ulong maxTopicSize = 0, ulong messageExpiry = 0, byte? replicationFactor = null,
CancellationToken token = default)
{
var message = TcpContracts.UpdateTopic(streamId, topicId, name, compressionAlgorithm, maxTopicSize,
messageExpiry, replicationFactor);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_TOPIC_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task DeleteTopicAsync(Identifier streamId, Identifier topicId, CancellationToken token = default)
{
var message = TcpContracts.DeleteTopic(streamId, topicId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_TOPIC_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task PurgeTopicAsync(Identifier streamId, Identifier topicId, CancellationToken token = default)
{
var message = TcpContracts.PurgeTopic(streamId, topicId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.PURGE_TOPIC_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task SendMessagesAsync(Identifier streamId, Identifier topicId, Partitioning partitioning,
IList<Message> messages,
CancellationToken token = default)
{
var metadataLength = 2 + streamId.Length + 2 + topicId.Length
+ 2 + partitioning.Length + 4 + 4;
var messageBufferSize = TcpMessageStreamHelpers.CalculateMessageBytesCount(messages)
+ metadataLength;
var payloadBufferSize = messageBufferSize + 4 + BufferSizes.INITIAL_BYTES_LENGTH;
IMemoryOwner<byte> messageBuffer = MemoryPool<byte>.Shared.Rent(messageBufferSize);
IMemoryOwner<byte> payloadBuffer = MemoryPool<byte>.Shared.Rent(payloadBufferSize);
try
{
TcpContracts.CreateMessage(messageBuffer.Memory.Span[..messageBufferSize], streamId,
topicId, partitioning, messages);
TcpMessageStreamHelpers.CreatePayload(payloadBuffer.Memory.Span[..payloadBufferSize],
messageBuffer.Memory.Span[..messageBufferSize], CommandCodes.SEND_MESSAGES_CODE);
await SendWithResponseAsync(payloadBuffer.Memory[..payloadBufferSize].ToArray(), token);
}
finally
{
messageBuffer.Dispose();
payloadBuffer.Dispose();
}
}
public async Task FlushUnsavedBufferAsync(Identifier streamId, Identifier topicId, uint partitionId, bool fsync,
CancellationToken token = default)
{
var message = TcpContracts.FlushUnsavedBuffer(streamId, topicId, partitionId, fsync);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.FLUSH_UNSAVED_BUFFER_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<PolledMessages> PollMessagesAsync(Identifier streamId, Identifier topicId, uint? partitionId,
Consumer consumer,
PollingStrategy pollingStrategy, uint count, bool autoCommit, CancellationToken token = default)
{
var messageBufferSize = CalculateMessageBufferSize(streamId, topicId, consumer);
var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
var message = new byte[messageBufferSize];
var payload = new byte[payloadBufferSize];
TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize], consumer, streamId,
topicId, pollingStrategy, count, autoCommit, partitionId);
TcpMessageStreamHelpers.CreatePayload(payload, message.AsSpan()[..messageBufferSize],
CommandCodes.POLL_MESSAGES_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
return BinaryMapper.MapMessages(responseBuffer);
}
public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId, ulong offset,
uint? partitionId, CancellationToken token = default)
{
var message = TcpContracts.UpdateOffset(streamId, topicId, consumer, offset, partitionId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.STORE_CONSUMER_OFFSET_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<OffsetResponse?> GetOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId,
uint? partitionId, CancellationToken token = default)
{
var message = TcpContracts.GetOffset(streamId, topicId, consumer, partitionId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_OFFSET_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapOffsets(responseBuffer);
}
public async Task DeleteOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId, uint? partitionId,
CancellationToken token = default)
{
var message = TcpContracts.DeleteOffset(streamId, topicId, consumer, partitionId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_CONSUMER_OFFSET_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<ConsumerGroupResponse>> GetConsumerGroupsAsync(Identifier streamId,
Identifier topicId,
CancellationToken token = default)
{
var message = TcpContracts.GetGroups(streamId, topicId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_GROUPS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return [];
}
return BinaryMapper.MapConsumerGroups(responseBuffer);
}
public async Task<ConsumerGroupResponse?> GetConsumerGroupByIdAsync(Identifier streamId, Identifier topicId,
Identifier groupId, CancellationToken token = default)
{
var message = TcpContracts.GetGroup(streamId, topicId, groupId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_GROUP_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapConsumerGroup(responseBuffer);
}
public async Task<ConsumerGroupResponse?> CreateConsumerGroupAsync(Identifier streamId, Identifier topicId,
string name, uint? groupId, CancellationToken token = default)
{
var message = TcpContracts.CreateGroup(streamId, topicId, name, groupId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_CONSUMER_GROUP_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapConsumerGroup(responseBuffer);
}
public async Task DeleteConsumerGroupAsync(Identifier streamId, Identifier topicId, Identifier groupId,
CancellationToken token = default)
{
var message = TcpContracts.DeleteGroup(streamId, topicId, groupId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_CONSUMER_GROUP_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task JoinConsumerGroupAsync(Identifier streamId, Identifier topicId, Identifier groupId,
CancellationToken token = default)
{
var message = TcpContracts.JoinGroup(streamId, topicId, groupId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.JOIN_CONSUMER_GROUP_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task LeaveConsumerGroupAsync(Identifier streamId, Identifier topicId, Identifier groupId,
CancellationToken token = default)
{
var message = TcpContracts.LeaveGroup(streamId, topicId, groupId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LEAVE_CONSUMER_GROUP_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task DeletePartitionsAsync(Identifier streamId, Identifier topicId, uint partitionsCount,
CancellationToken token = default)
{
var message = TcpContracts.DeletePartitions(streamId, topicId, partitionsCount);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PARTITIONS_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task CreatePartitionsAsync(Identifier streamId, Identifier topicId, uint partitionsCount,
CancellationToken token = default)
{
var message = TcpContracts.CreatePartitions(streamId, topicId, partitionsCount);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PARTITIONS_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<ClientResponse?> GetMeAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_ME_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapClient(responseBuffer);
}
public async Task<StatsResponse?> GetStatsAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STATS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapStats(responseBuffer);
}
public async Task PingAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.PING_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<ClientResponse>> GetClientsAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLIENTS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return [];
}
return BinaryMapper.MapClients(responseBuffer);
}
public async Task<ClientResponse?> GetClientByIdAsync(uint clientId, CancellationToken token = default)
{
var message = TcpContracts.GetClient(clientId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLIENT_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapClient(responseBuffer);
}
public async Task<UserResponse?> GetUser(Identifier userId, CancellationToken token = default)
{
var message = TcpContracts.GetUser(userId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_USER_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapUser(responseBuffer);
}
public async Task<IReadOnlyList<UserResponse>> GetUsers(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_USERS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return [];
}
return BinaryMapper.MapUsers(responseBuffer);
}
public async Task<UserResponse?> CreateUser(string userName, string password, UserStatus status,
Permissions? permissions = null, CancellationToken token = default)
{
var message = TcpContracts.CreateUser(userName, password, status, permissions);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_USER_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapUser(responseBuffer);
}
public async Task DeleteUser(Identifier userId, CancellationToken token = default)
{
var message = TcpContracts.DeleteUser(userId);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_USER_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task UpdateUser(Identifier userId, string? userName = null, UserStatus? status = null,
CancellationToken token = default)
{
var message = TcpContracts.UpdateUser(userId, userName, status);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_USER_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task UpdatePermissions(Identifier userId, Permissions? permissions = null,
CancellationToken token = default)
{
var message = TcpContracts.UpdatePermissions(userId, permissions);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_PERMISSIONS_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task ChangePassword(Identifier userId, string currentPassword, string newPassword,
CancellationToken token = default)
{
var message = TcpContracts.ChangePassword(userId, currentPassword, newPassword);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CHANGE_PASSWORD_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<AuthResponse?> LoginUser(string userName, string password, CancellationToken token = default)
{
var message = TcpContracts.LoginUser(userName, password, "0.5.0", "csharp-sdk");
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_USER_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length <= 0)
{
return null;
}
var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..responseBuffer.Length]);
return new AuthResponse(userId, null);
}
public async Task LogoutUser(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGOUT_USER_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<PersonalAccessTokenResponse>> GetPersonalAccessTokensAsync(
CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_PERSONAL_ACCESS_TOKENS_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return [];
}
return BinaryMapper.MapPersonalAccessTokens(responseBuffer);
}
public async Task<RawPersonalAccessToken?> CreatePersonalAccessTokenAsync(string name, ulong? expiry = 0,
CancellationToken token = default)
{
var message = TcpContracts.CreatePersonalAccessToken(name, expiry);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE);
var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
return null;
}
return BinaryMapper.MapRawPersonalAccessToken(responseBuffer);
}
public async Task DeletePersonalAccessTokenAsync(string name, CancellationToken token = default)
{
var message = TcpContracts.DeletePersonalRequestToken(name);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PERSONAL_ACCESS_TOKEN_CODE);
await SendWithResponseAsync(payload, token);
}
public async Task<AuthResponse?> LoginWithPersonalAccessToken(string token, CancellationToken ct = default)
{
var message = TcpContracts.LoginWithPersonalAccessToken(token);
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);
var responseBuffer = await SendWithResponseAsync(payload, ct);
if (responseBuffer.Length <= 1)
{
return null;
}
var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..4]);
//TODO: Figure out how to solve this workaround about default of TokenInfo
return new AuthResponse(userId, default);
}
// //TODO - look into calling the non generic FetchMessagesAsync method in order
// //to make this method re-usable for non generic PollMessages method.
// private async Task StartPollingMessagesAsync<TMessage>(MessageFetchRequest request,
// Func<byte[], TMessage> deserializer, TimeSpan interval, ChannelWriter<MessageResponse<TMessage>> writer,
// Func<byte[], byte[]>? decryptor = null,
// CancellationToken token = default)
// {
// var timer = new PeriodicTimer(interval);
// while (await timer.WaitForNextTickAsync(token) || token.IsCancellationRequested)
// {
// try
// {
// PolledMessages<TMessage> fetchResponse
// = await PollMessagesAsync(request, deserializer, decryptor, token);
// if (fetchResponse.Messages.Count == 0)
// {
// continue;
// }
//
// foreach (MessageResponse<TMessage> messageResponse in fetchResponse.Messages)
// {
// await writer.WriteAsync(messageResponse, token);
// }
// }
// catch (InvalidResponseException e)
// {
// _logger.LogError(
// "Error encountered while polling messages - Stream ID: {streamId}, Topic ID: {topicId}, Partition ID: {partitionId}, error message {message}",
// request.StreamId, request.TopicId, request.PartitionId, e.Message);
// }
// }
//
// writer.Complete();
// }
private async Task<byte[]> SendWithResponseAsync(byte[] payload, CancellationToken token = default)
{
try
{
await _semaphore.WaitAsync(token);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
var readBytes = await _stream.ReadAsync(buffer, token);
if (readBytes == 0)
{
throw new InvalidResponseException("Received empty response from server or connection was closed");
}
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
if (response.Length == 0)
{
throw new InvalidResponseException($"Invalid response status code: {response.Status}");
}
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
if (response.Length == 0)
{
return [];
}
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
return responseBuffer;
}
finally
{
_semaphore.Release();
}
}
private static int CalculatePayloadBufferSize(int messageBufferSize)
{
return messageBufferSize + 4 + BufferSizes.INITIAL_BYTES_LENGTH;
}
private static int CalculateMessageBufferSize(Identifier streamId, Identifier topicId, Consumer consumer)
{
return 14 + 5 + 2 + streamId.Length + 2 + topicId.Length + 2 + consumer.Id.Length;
}
}