Changed Value on Message from property to method.
Cancel pending Send requests when receiving a CloseProducer command.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 60d3eb1..d77b178 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -54,7 +54,7 @@
{
await foreach (var message in consumer.Messages(cancellationToken))
{
- Console.WriteLine("Received: " + message.Value);
+ Console.WriteLine("Received: " + message.Value());
await consumer.Acknowledge(message, cancellationToken);
}
}
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 475fccd..627b3e5 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -54,7 +54,7 @@
{
await foreach (var message in reader.Messages(cancellationToken))
{
- Console.WriteLine("Received: " + message.Value);
+ Console.WriteLine("Received: " + message.Value());
}
}
catch (OperationCanceledException) { }
diff --git a/src/DotPulsar/Abstractions/IMessageOfT.cs b/src/DotPulsar/Abstractions/IMessageOfT.cs
index 6b90ae5..792791f 100644
--- a/src/DotPulsar/Abstractions/IMessageOfT.cs
+++ b/src/DotPulsar/Abstractions/IMessageOfT.cs
@@ -22,6 +22,6 @@
/// <summary>
/// The value of the message.
/// </summary>
- public TValue Value { get; }
+ public TValue Value();
}
}
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
index 5aa1e40..08fecd7 100644
--- a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -20,6 +20,9 @@
using System.Threading;
using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IConsumerBuilder.
+ /// </summary>
public static class ConsumerBuilderExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Extensions/MessageBuilderExtensions.cs b/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
index fe87ead..aaf2da3 100644
--- a/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/MessageBuilderExtensions.cs
@@ -20,6 +20,9 @@
using System.Threading;
using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IMessageBuilder.
+ /// </summary>
public static class MessageBuilderExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
index 7c4a42c..2c11133 100644
--- a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
@@ -20,6 +20,9 @@
using System.Threading;
using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IProducerBuilder.
+ /// </summary>
public static class ProducerBuilderExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs b/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
index 2ec6c8d..863872d 100644
--- a/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientBuilderExtensions.cs
@@ -19,6 +19,9 @@
using System;
using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IPulsarClientBuilder.
+ /// </summary>
public static class PulsarClientBuilderExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
index c9d4b69..7f86286 100644
--- a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
@@ -20,6 +20,9 @@
using System.Threading;
using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IReaderBuilder.
+ /// </summary>
public static class ReaderBuilderExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Internal/Abstractions/IRequest.cs b/src/DotPulsar/Internal/Abstractions/IRequest.cs
new file mode 100644
index 0000000..f97314f
--- /dev/null
+++ b/src/DotPulsar/Internal/Abstractions/IRequest.cs
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+ using System;
+
+ public interface IRequest : IEquatable<IRequest> { }
+}
diff --git a/src/DotPulsar/Internal/Awaiter.cs b/src/DotPulsar/Internal/Awaiter.cs
index ebbd254..6cec51c 100644
--- a/src/DotPulsar/Internal/Awaiter.cs
+++ b/src/DotPulsar/Internal/Awaiter.cs
@@ -16,6 +16,7 @@
{
using System;
using System.Collections.Concurrent;
+ using System.Collections.Generic;
using System.Threading.Tasks;
public sealed class Awaiter<T, TResult> : IDisposable where T : notnull
@@ -38,6 +39,14 @@
tcs.SetResult(result);
}
+ public void Cancel(T item)
+ {
+ if (_items.TryRemove(item, out var tcs))
+ tcs.SetCanceled();
+ }
+
+ public IEnumerable<T> Keys => _items.Keys;
+
public void Dispose()
{
foreach (var item in _items.Values)
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 56bd58d..8ac06f5 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -255,6 +255,7 @@
break;
case BaseCommand.Type.CloseProducer:
_channelManager.Incoming(command.CloseProducer);
+ _requestResponseHandler.Incoming(command.CloseProducer);
break;
case BaseCommand.Type.Ping:
_pingPongHandler.GotPing();
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index e31f5a3..a988189 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -17,7 +17,7 @@
#if NETSTANDARD2_0
public sealed class DotPulsarEventSource
{
- public static readonly DotPulsarEventSource Log = new DotPulsarEventSource();
+ public static readonly DotPulsarEventSource Log = new();
public void ClientCreated() { }
@@ -78,7 +78,7 @@
private long _currentReaders;
#pragma warning restore IDE0052 // Remove unread private members
- public static readonly DotPulsarEventSource Log = new DotPulsarEventSource();
+ public static readonly DotPulsarEventSource Log = new();
public DotPulsarEventSource() : base("DotPulsar") { }
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 867c21d..0a76401 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -77,112 +77,112 @@
});
public static BaseCommand AsBaseCommand(this CommandAck command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Ack,
Ack = command
};
public static BaseCommand AsBaseCommand(this CommandConnect command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Connect,
Connect = command
};
public static BaseCommand AsBaseCommand(this CommandPing command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Ping,
Ping = command
};
public static BaseCommand AsBaseCommand(this CommandPong command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Pong,
Pong = command
};
public static BaseCommand AsBaseCommand(this CommandProducer command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Producer,
Producer = command
};
public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.GetLastMessageId,
GetLastMessageId = command
};
public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Unsubscribe,
Unsubscribe = command
};
public static BaseCommand AsBaseCommand(this CommandSubscribe command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Subscribe,
Subscribe = command
};
public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Lookup,
LookupTopic = command
};
public static BaseCommand AsBaseCommand(this CommandSend command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Send,
Send = command
};
public static BaseCommand AsBaseCommand(this CommandFlow command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Flow,
Flow = command
};
public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.CloseProducer,
CloseProducer = command
};
public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.CloseConsumer,
CloseConsumer = command
};
public static BaseCommand AsBaseCommand(this CommandSeek command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.Seek,
Seek = command
};
public static BaseCommand AsBaseCommand(this CommandRedeliverUnacknowledgedMessages command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
RedeliverUnacknowledgedMessages = command
};
public static BaseCommand AsBaseCommand(this CommandGetOrCreateSchema command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.GetOrCreateSchema,
GetOrCreateSchema = command
diff --git a/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
index 857b00c..8f59a64 100644
--- a/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
@@ -19,7 +19,7 @@
public static class MessageIdDataExtensions
{
public static MessageId ToMessageId(this MessageIdData messageIdData)
- => new MessageId(messageIdData.LedgerId, messageIdData.EntryId, messageIdData.Partition, messageIdData.BatchIndex);
+ => new(messageIdData.LedgerId, messageIdData.EntryId, messageIdData.Partition, messageIdData.BatchIndex);
public static void MapFrom(this MessageIdData destination, MessageId source)
{
diff --git a/src/DotPulsar/Internal/Message.cs b/src/DotPulsar/Internal/Message.cs
index a5b0972..8de1222 100644
--- a/src/DotPulsar/Internal/Message.cs
+++ b/src/DotPulsar/Internal/Message.cs
@@ -93,6 +93,6 @@
public IReadOnlyDictionary<string, string> Properties { get; }
- public TValue Value => _schema.Decode(Data);
+ public TValue Value() => _schema.Decode(Data);
}
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 4427752..597387a 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -26,10 +26,10 @@
public sealed class NotReadyChannel<TMessage> : IConsumerChannel<TMessage>, IProducerChannel
{
public ValueTask DisposeAsync()
- => new ValueTask();
+ => new();
public ValueTask ClosedByClient(CancellationToken cancellationToken)
- => new ValueTask();
+ => new();
public ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken = default)
=> throw GetException();
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 20ae831..d23aa62 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -61,7 +61,7 @@
}
}
- public ValueTask DisposeAsync() => new ValueTask();
+ public ValueTask DisposeAsync() => new();
public async Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 23ef8c5..7d9a1ac 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -64,7 +64,7 @@
private async ValueTask<byte[]?> GetSchemaVersion(IConnection connection, CancellationToken cancellationToken)
{
- if (_schema is null)
+ if (_schema is null || _schema.Type == Schema.SchemaType.None)
return null;
var command = new CommandGetOrCreateSchema
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 55777f6..5f9bc04 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -14,30 +14,30 @@
namespace DotPulsar.Internal
{
+ using DotPulsar.Internal.Abstractions;
+ using DotPulsar.Internal.Requests;
using PulsarApi;
using System;
using System.Threading.Tasks;
public sealed class RequestResponseHandler : IDisposable
{
- private const string _connectResponseIdentifier = "Connected";
-
- private readonly Awaiter<string, BaseCommand> _responses;
+ private readonly Awaiter<IRequest, BaseCommand> _requests;
private readonly RequestId _requestId;
public RequestResponseHandler()
{
- _responses = new Awaiter<string, BaseCommand>();
+ _requests = new Awaiter<IRequest, BaseCommand>();
_requestId = new RequestId();
}
public void Dispose()
- => _responses.Dispose();
+ => _requests.Dispose();
public Task<BaseCommand> Outgoing(BaseCommand command)
{
SetRequestId(command);
- return _responses.CreateTask(GetResponseIdentifier(command));
+ return _requests.CreateTask(GetResponseIdentifier(command));
}
public void Incoming(BaseCommand command)
@@ -45,7 +45,17 @@
var identifier = GetResponseIdentifier(command);
if (identifier is not null)
- _responses.SetResult(identifier, command);
+ _requests.SetResult(identifier, command);
+ }
+
+ public void Incoming(CommandCloseProducer command)
+ {
+ var requests = _requests.Keys;
+ foreach (var request in requests)
+ {
+ if (request is SendRequest sendRequest && sendRequest.ProducerId == command.ProducerId)
+ _requests.Cancel(request);
+ }
}
private void SetRequestId(BaseCommand cmd)
@@ -85,29 +95,29 @@
}
}
- private string GetResponseIdentifier(BaseCommand cmd)
+ private IRequest GetResponseIdentifier(BaseCommand cmd)
=> cmd.CommandType switch
{
- BaseCommand.Type.Connect => _connectResponseIdentifier,
- BaseCommand.Type.Connected => _connectResponseIdentifier,
- BaseCommand.Type.Send => $"{cmd.Send.ProducerId}-{cmd.Send.SequenceId}",
- BaseCommand.Type.SendError => $"{cmd.SendError.ProducerId}-{cmd.SendError.SequenceId}",
- BaseCommand.Type.SendReceipt => $"{cmd.SendReceipt.ProducerId}-{cmd.SendReceipt.SequenceId}",
- BaseCommand.Type.Error => !_requestId.IsPastInitialId() ? _connectResponseIdentifier : cmd.Error.RequestId.ToString(),
- BaseCommand.Type.Producer => cmd.Producer.RequestId.ToString(),
- BaseCommand.Type.ProducerSuccess => cmd.ProducerSuccess.RequestId.ToString(),
- BaseCommand.Type.CloseProducer => cmd.CloseProducer.RequestId.ToString(),
- BaseCommand.Type.Lookup => cmd.LookupTopic.RequestId.ToString(),
- BaseCommand.Type.LookupResponse => cmd.LookupTopicResponse.RequestId.ToString(),
- BaseCommand.Type.Unsubscribe => cmd.Unsubscribe.RequestId.ToString(),
- BaseCommand.Type.Subscribe => cmd.Subscribe.RequestId.ToString(),
- BaseCommand.Type.Success => cmd.Success.RequestId.ToString(),
- BaseCommand.Type.Seek => cmd.Seek.RequestId.ToString(),
- BaseCommand.Type.CloseConsumer => cmd.CloseConsumer.RequestId.ToString(),
- BaseCommand.Type.GetLastMessageId => cmd.GetLastMessageId.RequestId.ToString(),
- BaseCommand.Type.GetLastMessageIdResponse => cmd.GetLastMessageIdResponse.RequestId.ToString(),
- BaseCommand.Type.GetOrCreateSchema => cmd.GetOrCreateSchema.RequestId.ToString(),
- BaseCommand.Type.GetOrCreateSchemaResponse => cmd.GetOrCreateSchemaResponse.RequestId.ToString(),
+ BaseCommand.Type.Send => new SendRequest(cmd.Send.ProducerId, cmd.Send.SequenceId),
+ BaseCommand.Type.SendReceipt => new SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId),
+ BaseCommand.Type.SendError => new SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId),
+ BaseCommand.Type.Connect => new ConnectRequest(),
+ BaseCommand.Type.Connected => new ConnectRequest(),
+ BaseCommand.Type.Error => !_requestId.IsPastInitialId() ? new ConnectRequest() : new StandardRequest(cmd.Error.RequestId),
+ BaseCommand.Type.Producer => new StandardRequest(cmd.Producer.RequestId),
+ BaseCommand.Type.ProducerSuccess => new StandardRequest(cmd.ProducerSuccess.RequestId),
+ BaseCommand.Type.CloseProducer => new StandardRequest(cmd.CloseProducer.RequestId),
+ BaseCommand.Type.Lookup => new StandardRequest(cmd.LookupTopic.RequestId),
+ BaseCommand.Type.LookupResponse => new StandardRequest(cmd.LookupTopicResponse.RequestId),
+ BaseCommand.Type.Unsubscribe => new StandardRequest(cmd.Unsubscribe.RequestId),
+ BaseCommand.Type.Subscribe => new StandardRequest(cmd.Subscribe.RequestId),
+ BaseCommand.Type.Success => new StandardRequest(cmd.Success.RequestId),
+ BaseCommand.Type.Seek => new StandardRequest(cmd.Seek.RequestId),
+ BaseCommand.Type.CloseConsumer => new StandardRequest(cmd.CloseConsumer.RequestId),
+ BaseCommand.Type.GetLastMessageId => new StandardRequest(cmd.GetLastMessageId.RequestId),
+ BaseCommand.Type.GetLastMessageIdResponse => new StandardRequest(cmd.GetLastMessageIdResponse.RequestId),
+ BaseCommand.Type.GetOrCreateSchema => new StandardRequest(cmd.GetOrCreateSchema.RequestId),
+ BaseCommand.Type.GetOrCreateSchemaResponse => new StandardRequest(cmd.GetOrCreateSchemaResponse.RequestId),
_ => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type")
};
}
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
new file mode 100644
index 0000000..ceca386
--- /dev/null
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Requests
+{
+ using DotPulsar.Internal.Abstractions;
+ using System.Diagnostics.CodeAnalysis;
+
+ public struct ConnectRequest : IRequest
+ {
+#if NETSTANDARD2_0
+ public bool Equals(IRequest other)
+#else
+ public bool Equals([AllowNull] IRequest other)
+#endif
+ => other is ConnectRequest;
+
+ public override int GetHashCode()
+ => int.MinValue;
+ }
+}
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs b/src/DotPulsar/Internal/Requests/SendRequest.cs
new file mode 100644
index 0000000..5f15fd2
--- /dev/null
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Requests
+{
+ using DotPulsar.Internal.Abstractions;
+ using System;
+ using System.Diagnostics.CodeAnalysis;
+
+ public struct SendRequest : IRequest
+ {
+ public ulong ProducerId { get; }
+ public ulong SequenceId { get; }
+
+ public SendRequest(ulong producerId, ulong sequenceId)
+ {
+ ProducerId = producerId;
+ SequenceId = sequenceId;
+ }
+
+#if NETSTANDARD2_0
+ public bool Equals(IRequest other)
+#else
+ public bool Equals([AllowNull] IRequest other)
+#endif
+ {
+ if (other is SendRequest request)
+ return ProducerId.Equals(request.ProducerId) && SequenceId.Equals(request.SequenceId);
+
+ return false;
+ }
+
+ public override int GetHashCode()
+ => HashCode.Combine(ProducerId, SequenceId);
+ }
+}
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs b/src/DotPulsar/Internal/Requests/StandardRequest.cs
new file mode 100644
index 0000000..61b656e
--- /dev/null
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Requests
+{
+ using DotPulsar.Internal.Abstractions;
+ using System;
+ using System.Diagnostics.CodeAnalysis;
+
+ public struct StandardRequest : IRequest
+ {
+ public ulong RequestId { get; }
+
+ public StandardRequest(ulong requestId)
+ => RequestId = requestId;
+
+#if NETSTANDARD2_0
+ public bool Equals(IRequest other)
+#else
+ public bool Equals([AllowNull] IRequest other)
+#endif
+ {
+ if (other is StandardRequest request)
+ return RequestId.Equals(request.RequestId);
+
+ return false;
+ }
+
+ public override int GetHashCode()
+ => HashCode.Combine(RequestId);
+ }
+}
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index 332a94e..78a3d3c 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -107,7 +107,7 @@
public async Task DequeueAsync_GivenTokenIsCanceled_ShouldCancelTask()
{
//Arrange
- CancellationTokenSource source1 = new CancellationTokenSource(), source2 = new CancellationTokenSource();
+ CancellationTokenSource source1 = new(), source2 = new();
const int excepted = 1;
var queue = new AsyncQueue<int>();
var task1 = queue.Dequeue(source1.Token).AsTask();