| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System.Net; |
| using System.Net.Http.Headers; |
| using System.Net.Http.Json; |
| using System.Text; |
| using System.Text.Json; |
| using System.Text.Json.Serialization; |
| using Apache.Iggy.Contracts; |
| using Apache.Iggy.Contracts.Auth; |
| using Apache.Iggy.Contracts.Http; |
| using Apache.Iggy.Contracts.Http.Auth; |
| using Apache.Iggy.Enums; |
| using Apache.Iggy.Exceptions; |
| using Apache.Iggy.Kinds; |
| using Apache.Iggy.Messages; |
| using Apache.Iggy.StringHandlers; |
| using Partitioning = Apache.Iggy.Kinds.Partitioning; |
| |
| namespace Apache.Iggy.IggyClient.Implementations; |
| |
| public class HttpMessageStream : IIggyClient |
| { |
| private const string Context = "csharp-sdk"; |
| |
| private readonly HttpClient _httpClient; |
| |
| //TODO - create mechanism for refreshing jwt token |
| //TODO - replace the HttpClient with IHttpClientFactory, when implementing support for ASP.NET Core DI |
| //TODO - the error handling pattern is pretty ugly, look into moving it into an extension method |
| private readonly JsonSerializerOptions _jsonSerializerOptions; |
| |
| internal HttpMessageStream(HttpClient httpClient) |
| { |
| _httpClient = httpClient; |
| |
| _jsonSerializerOptions = new JsonSerializerOptions |
| { |
| PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, |
| Converters = { new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower) } |
| }; |
| } |
| |
| public async Task<StreamResponse?> CreateStreamAsync(string name, uint? streamId = null, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new CreateStreamRequest(streamId, name), _jsonSerializerOptions); |
| |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response = await _httpClient.PostAsync("/streams", data, token); |
| |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<StreamResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| |
| return null; |
| } |
| |
| public async Task PurgeStreamAsync(Identifier streamId, CancellationToken token = default) |
| { |
| var response = await _httpClient.DeleteAsync($"/streams/{streamId}/purge", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task DeleteStreamAsync(Identifier streamId, CancellationToken token = default) |
| { |
| var response = await _httpClient.DeleteAsync($"/streams/{streamId}", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<StreamResponse?> GetStreamByIdAsync(Identifier streamId, CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync($"/streams/{streamId}", token); |
| |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<StreamResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| |
| return null; |
| } |
| |
| public async Task UpdateStreamAsync(Identifier streamId, string name, CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new UpdateStreamRequest(name), _jsonSerializerOptions); |
| |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PutAsync($"/streams/{streamId}", data, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<IReadOnlyList<StreamResponse>> GetStreamsAsync(CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync("/streams", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<IReadOnlyList<StreamResponse>>(_jsonSerializerOptions, |
| token) |
| ?? Array.Empty<StreamResponse>(); |
| } |
| |
| await HandleResponseAsync(response); |
| return Array.Empty<StreamResponse>(); |
| } |
| |
| public async Task<TopicResponse?> CreateTopicAsync(Identifier streamId, string name, uint partitionsCount, |
| CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, |
| uint? topicId = null, byte? replicationFactor = null, ulong messageExpiry = 0, ulong maxTopicSize = 0, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new CreateTopicRequest |
| { |
| Name = name, |
| CompressionAlgorithm = compressionAlgorithm, |
| MaxTopicSize = maxTopicSize, |
| MessageExpiry = messageExpiry, |
| PartitionsCount = partitionsCount, |
| ReplicationFactor = replicationFactor, |
| TopicId = topicId |
| }, _jsonSerializerOptions); |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response = await _httpClient.PostAsync($"/streams/{streamId}/topics", data, token); |
| |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<TopicResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| |
| return null; |
| } |
| |
| public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, string name, |
| CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, |
| ulong maxTopicSize = 0, ulong messageExpiry = 0, byte? replicationFactor = null, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize( |
| new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, messageExpiry, replicationFactor), |
| _jsonSerializerOptions); |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}", data, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task DeleteTopicAsync(Identifier streamId, Identifier topicId, CancellationToken token = default) |
| { |
| var response = await _httpClient.DeleteAsync($"/streams/{streamId}/topics/{topicId}", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public Task PurgeTopicAsync(Identifier streamId, Identifier topicId, CancellationToken token = default) |
| { |
| return _httpClient.DeleteAsync($"/streams/{streamId}/topics/{topicId}/purge", token) |
| .ContinueWith(async response => |
| { |
| if (!response.Result.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response.Result); |
| } |
| }, token); |
| } |
| |
| public async Task<IReadOnlyList<TopicResponse>> GetTopicsAsync(Identifier streamId, |
| CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync($"/streams/{streamId}/topics", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<IReadOnlyList<TopicResponse>>(_jsonSerializerOptions, token) |
| ?? Array.Empty<TopicResponse>(); |
| } |
| |
| await HandleResponseAsync(response); |
| return Array.Empty<TopicResponse>(); |
| } |
| |
| public async Task<TopicResponse?> GetTopicByIdAsync(Identifier streamId, Identifier topicId, |
| CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}", token); |
| |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<TopicResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| |
| return null; |
| } |
| |
| public async Task SendMessagesAsync(Identifier streamId, Identifier topicId, Partitioning partitioning, |
| IList<Message> messages, |
| CancellationToken token = default) |
| { |
| var request = new MessageSendRequest |
| { |
| StreamId = streamId, |
| TopicId = topicId, |
| Partitioning = partitioning, |
| Messages = messages |
| }; |
| var json = JsonSerializer.Serialize(request, _jsonSerializerOptions); |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response = await _httpClient.PostAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/messages", |
| data, |
| token); |
| |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task FlushUnsavedBufferAsync(Identifier streamId, Identifier topicId, uint partitionId, bool fsync, |
| CancellationToken token = default) |
| { |
| var url = CreateUrl($"/streams/{streamId}/topics/{topicId}/messages/flush/{partitionId}/{fsync}"); |
| |
| var response = await _httpClient.GetAsync(url, token); |
| |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response, true); |
| } |
| } |
| |
| public async Task<PolledMessages> PollMessagesAsync(Identifier streamId, Identifier topicId, uint? partitionId, |
| Consumer consumer, |
| PollingStrategy pollingStrategy, uint count, bool autoCommit, CancellationToken token = default) |
| { |
| var partitionIdParam = partitionId.HasValue ? $"&partition_id={partitionId.Value}" : string.Empty; |
| var url = CreateUrl($"/streams/{streamId}/topics/{topicId}/messages?consumer_id={consumer.Id}" + |
| $"{partitionIdParam}&kind={pollingStrategy.Kind}&value={pollingStrategy.Value}&count={count}&auto_commit={autoCommit}"); |
| |
| var response = await _httpClient.GetAsync(url, token); |
| if (response.IsSuccessStatusCode) |
| { |
| var pollMessages = await response.Content.ReadFromJsonAsync<PolledMessages>(_jsonSerializerOptions, token) |
| ?? PolledMessages.Empty; |
| |
| return pollMessages; |
| } |
| |
| await HandleResponseAsync(response, true); |
| return PolledMessages.Empty; |
| } |
| |
| public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId, ulong offset, |
| uint? partitionId, CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new StoreOffsetRequest(consumer, partitionId, offset), |
| _jsonSerializerOptions); |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response |
| = await _httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}/consumer-offsets", data, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<OffsetResponse?> GetOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId, |
| uint? partitionId, CancellationToken token = default) |
| { |
| var partitionIdParam = partitionId.HasValue ? $"&partition_id={partitionId.Value}" : string.Empty; |
| var response = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}/" + |
| $"consumer-offsets?consumer_id={consumer.Id}{partitionIdParam}", |
| token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<OffsetResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task DeleteOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId, uint? partitionId, |
| CancellationToken token = default) |
| { |
| var partitionIdParam = partitionId.HasValue ? $"?partition_id={partitionId.Value}" : string.Empty; |
| var response = await _httpClient.DeleteAsync( |
| $"/streams/{streamId}/topics/{topicId}/consumer-offsets/{consumer}{partitionIdParam}", token); |
| await HandleResponseAsync(response); |
| } |
| |
| public async Task<IReadOnlyList<ConsumerGroupResponse>> GetConsumerGroupsAsync(Identifier streamId, |
| Identifier topicId, CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}/consumer-groups", token); |
| |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<IReadOnlyList<ConsumerGroupResponse>>( |
| _jsonSerializerOptions, token) |
| ?? Array.Empty<ConsumerGroupResponse>(); |
| } |
| |
| await HandleResponseAsync(response); |
| return Array.Empty<ConsumerGroupResponse>(); |
| } |
| |
| public async Task<ConsumerGroupResponse?> GetConsumerGroupByIdAsync(Identifier streamId, Identifier topicId, |
| Identifier groupId, CancellationToken token = default) |
| { |
| var response |
| = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}/consumer-groups/{groupId}", token); |
| |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<ConsumerGroupResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task<ConsumerGroupResponse?> CreateConsumerGroupAsync(Identifier streamId, Identifier topicId, |
| string name, uint? groupId, CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new CreateConsumerGroupRequest(name, groupId), _jsonSerializerOptions); |
| |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response |
| = await _httpClient.PostAsync($"/streams/{streamId}/topics/{topicId}/consumer-groups", data, token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<ConsumerGroupResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task DeleteConsumerGroupAsync(Identifier streamId, Identifier topicId, Identifier groupId, |
| CancellationToken token = default) |
| { |
| var response |
| = await _httpClient.DeleteAsync($"/streams/{streamId}/topics/{topicId}/consumer-groups/{groupId}", token); |
| await HandleResponseAsync(response); |
| } |
| |
| public Task<ClientResponse?> GetMeAsync(CancellationToken token = default) |
| { |
| throw new FeatureUnavailableException(); |
| } |
| |
| public async Task<StatsResponse?> GetStatsAsync(CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync("/stats", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<StatsResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task PingAsync(CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync("/ping", token); |
| |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<IReadOnlyList<ClientResponse>> GetClientsAsync(CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync("/clients", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<IReadOnlyList<ClientResponse>>(_jsonSerializerOptions, |
| token) |
| ?? Array.Empty<ClientResponse>(); |
| } |
| |
| await HandleResponseAsync(response); |
| return Array.Empty<ClientResponse>(); |
| } |
| |
| public async Task<ClientResponse?> GetClientByIdAsync(uint clientId, CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync($"/clients/{clientId}", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<ClientResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| [Obsolete("This method is only supported in TCP protocol", true)] |
| public Task JoinConsumerGroupAsync(Identifier streamId, Identifier topicId, Identifier groupId, |
| CancellationToken token = default) |
| { |
| throw new FeatureUnavailableException(); |
| } |
| |
| [Obsolete("This method is only supported in TCP protocol", true)] |
| public Task LeaveConsumerGroupAsync(Identifier streamId, Identifier topicId, Identifier groupId, |
| CancellationToken token = default) |
| { |
| throw new FeatureUnavailableException(); |
| } |
| |
| public async Task DeletePartitionsAsync(Identifier streamId, Identifier topicId, uint partitionsCount, |
| CancellationToken token = default) |
| { |
| var response |
| = await _httpClient.DeleteAsync( |
| $"/streams/{streamId}/topics/{topicId}/partitions?partitions_count={partitionsCount}", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task CreatePartitionsAsync(Identifier streamId, Identifier topicId, uint partitionsCount, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new CreatePartitionsRequest(partitionsCount), _jsonSerializerOptions); |
| |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response = await _httpClient.PostAsync($"/streams/{streamId}/topics/{topicId}/partitions", data, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<UserResponse?> GetUser(Identifier userId, CancellationToken token = default) |
| { |
| //TODO - this doesn't work prob needs a custom json serializer |
| var response = await _httpClient.GetAsync($"/users/{userId}", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<UserResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task<IReadOnlyList<UserResponse>> GetUsers(CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync("/users", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<IReadOnlyList<UserResponse>>(_jsonSerializerOptions, token) |
| ?? Array.Empty<UserResponse>(); |
| } |
| |
| await HandleResponseAsync(response); |
| return Array.Empty<UserResponse>(); |
| } |
| |
| public async Task<UserResponse?> CreateUser(string userName, string password, UserStatus status, |
| Permissions? permissions = null, CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new CreateUserRequest(userName, password, status, permissions), |
| _jsonSerializerOptions); |
| |
| var content = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PostAsync("/users", content, token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<UserResponse>(_jsonSerializerOptions, token); |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task DeleteUser(Identifier userId, CancellationToken token = default) |
| { |
| var response = await _httpClient.DeleteAsync($"/users/{userId}", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task UpdateUser(Identifier userId, string? userName = null, UserStatus? status = null, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new UpdateUserRequest(userName, status), _jsonSerializerOptions); |
| var content = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PutAsync($"/users/{userId}", content, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task UpdatePermissions(Identifier userId, Permissions? permissions = null, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new UpdateUserPermissionsRequest(permissions), _jsonSerializerOptions); |
| var content = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PutAsync($"/users/{userId}/permissions", content, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task ChangePassword(Identifier userId, string currentPassword, string newPassword, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new ChangePasswordRequest(currentPassword, newPassword), |
| _jsonSerializerOptions); |
| var content = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PutAsync($"/users/{userId}/password", content, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<AuthResponse?> LoginUser(string userName, string password, CancellationToken token = default) |
| { |
| // TODO: get version |
| var json = JsonSerializer.Serialize(new LoginUserRequest(userName, password, "", Context), |
| _jsonSerializerOptions); |
| |
| var data = new StringContent(json, Encoding.UTF8, "application/json"); |
| |
| var response = await _httpClient.PostAsync("users/login", data, token); |
| if (response.IsSuccessStatusCode) |
| { |
| var authResponse = await response.Content.ReadFromJsonAsync<AuthResponse>(_jsonSerializerOptions, token); |
| var jwtToken = authResponse!.AccessToken?.Token; |
| if (!string.IsNullOrEmpty(authResponse!.AccessToken!.Token)) |
| { |
| _httpClient.DefaultRequestHeaders.Authorization = |
| new AuthenticationHeaderValue("Bearer", jwtToken); |
| } |
| else |
| { |
| throw new Exception("The JWT token is missing."); |
| } |
| |
| return authResponse; |
| } |
| |
| await HandleResponseAsync(response); |
| return null; |
| } |
| |
| public async Task LogoutUser(CancellationToken token = default) |
| { |
| var response = await _httpClient.DeleteAsync("users/logout", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| |
| _httpClient.DefaultRequestHeaders.Authorization = null; |
| } |
| |
| public async Task<IReadOnlyList<PersonalAccessTokenResponse>> GetPersonalAccessTokensAsync( |
| CancellationToken token = default) |
| { |
| var response = await _httpClient.GetAsync("/personal-access-tokens", token); |
| if (response.IsSuccessStatusCode) |
| { |
| return await response.Content.ReadFromJsonAsync<IReadOnlyList<PersonalAccessTokenResponse>>( |
| _jsonSerializerOptions, token) |
| ?? Array.Empty<PersonalAccessTokenResponse>(); |
| } |
| |
| await HandleResponseAsync(response); |
| return Array.Empty<PersonalAccessTokenResponse>(); |
| } |
| |
| public async Task<RawPersonalAccessToken?> CreatePersonalAccessTokenAsync(string name, ulong? expiry = null, |
| CancellationToken token = default) |
| { |
| var json = JsonSerializer.Serialize(new CreatePersonalAccessTokenRequest(name, expiry), _jsonSerializerOptions); |
| |
| var content = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PostAsync("/personal-access-tokens", content, token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| |
| return await response.Content.ReadFromJsonAsync<RawPersonalAccessToken>(_jsonSerializerOptions, token); |
| } |
| |
| public async Task DeletePersonalAccessTokenAsync(string name, CancellationToken token = default) |
| { |
| var response = await _httpClient.DeleteAsync($"/personal-access-tokens/{name}", token); |
| if (!response.IsSuccessStatusCode) |
| { |
| await HandleResponseAsync(response); |
| } |
| } |
| |
| public async Task<AuthResponse?> LoginWithPersonalAccessToken(string token, CancellationToken ct = default) |
| { |
| var json = JsonSerializer.Serialize(new LoginWithPersonalAccessTokenRequest(token), _jsonSerializerOptions); |
| |
| var content = new StringContent(json, Encoding.UTF8, "application/json"); |
| var response = await _httpClient.PostAsync("/personal-access-tokens/login", content, ct); |
| if (response.IsSuccessStatusCode) |
| { |
| var authResponse = await response.Content.ReadFromJsonAsync<AuthResponse>(_jsonSerializerOptions, ct); |
| var jwtToken = authResponse!.AccessToken?.Token; |
| if (!string.IsNullOrEmpty(authResponse!.AccessToken!.Token)) |
| { |
| _httpClient.DefaultRequestHeaders.Authorization = |
| new AuthenticationHeaderValue("Bearer", jwtToken); |
| } |
| else |
| { |
| throw new Exception("The JWT token is missing."); |
| } |
| |
| return authResponse; |
| } |
| |
| await HandleResponseAsync(response); |
| |
| return null; |
| } |
| |
| public void Dispose() |
| { |
| } |
| |
| private static async Task HandleResponseAsync(HttpResponseMessage response, bool shouldThrowOnGetNotFound = false) |
| { |
| if ((int)response.StatusCode > 300 |
| && (int)response.StatusCode < 500 |
| && !(response.RequestMessage!.Method == HttpMethod.Get && response.StatusCode == HttpStatusCode.NotFound && |
| !shouldThrowOnGetNotFound)) |
| { |
| var err = await response.Content.ReadAsStringAsync(); |
| throw new InvalidResponseException(err); |
| } |
| |
| if (response.StatusCode == HttpStatusCode.InternalServerError) |
| { |
| throw new Exception("Internal server error"); |
| } |
| } |
| |
| private static string CreateUrl(ref MessageRequestInterpolationHandler message) |
| { |
| return message.ToString(); |
| } |
| } |