first full cleanup
diff --git a/.editorconfig b/.editorconfig
index 5021ae4..44b4c97 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -9,6 +9,21 @@
# Microsoft .NET properties
csharp_using_directive_placement = inside_namespace:silent
+# ReSharper properties
+resharper_constructor_or_destructor_body=expression_body
+resharper_local_function_body=expression_body
+resharper_method_or_operator_body=expression_body
+resharper_wrap_before_arrow_with_expressions=true
+resharper_csharp_insert_final_newline=true
+resharper_csharp_keep_blank_lines_in_code=1
+resharper_csharp_keep_blank_lines_in_declarations=1
+resharper_empty_block_style=together_same_line
+resharper_keep_existing_expr_member_arrangement=false
+resharper_place_expr_accessor_on_single_line=true
+resharper_place_expr_property_on_single_line=true
+resharper_space_within_single_line_array_initializer_braces=true
+resharper_use_indent_from_vs=false
+
[{*.yaml, *.yml}]
indent_size = 2
diff --git a/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml b/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml
index 0521549..25b0629 100644
--- a/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml
+++ b/.idea/.idea.DotPulsar/.idea/codeStyles/Project.xml
@@ -1,8 +1,8 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<option name="AUTODETECT_INDENTS" value="false" />
- <option name="RIGHT_MARGIN" value="160" />
+ <option name="RIGHT_MARGIN" value="180" />
<option name="WRAP_WHEN_TYPING_REACHES_RIGHT_MARGIN" value="true" />
- <option name="SOFT_MARGINS" value="140,160" />
+ <option name="SOFT_MARGINS" value="180" />
</code_scheme>
</component>
\ No newline at end of file
diff --git a/DotPulsar.sln.DotSettings b/DotPulsar.sln.DotSettings
index cbd4041..c011518 100644
--- a/DotPulsar.sln.DotSettings
+++ b/DotPulsar.sln.DotSettings
@@ -1,4 +1,8 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
+ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeAccessorOwnerBody/@EntryIndexedValue">DO_NOT_SHOW</s:String>
+ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeConstructorOrDestructorBody/@EntryIndexedValue"></s:String>
+ <s:Boolean x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeConstructorOrDestructorBody/@EntryIndexRemoved">True</s:Boolean>
+ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=SuggestDiscardDeclarationVarStyle/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=DotPulsar_003A_0020Full_0020Cleanup/@EntryIndexedValue"><?xml version="1.0" encoding="utf-16"?><Profile name="DotPulsar: Full Cleanup"><XMLReformatCode>True</XMLReformatCode><CSCodeStyleAttributes ArrangeTypeAccessModifier="True" ArrangeTypeMemberAccessModifier="True" SortModifiers="True" RemoveRedundantParentheses="True" AddMissingParentheses="True" ArrangeBraces="True" ArrangeAttributes="True" ArrangeArgumentsStyle="True" ArrangeCodeBodyStyle="True" ArrangeVarStyle="True" ArrangeTrailingCommas="True" /><CSOptimizeUsings><OptimizeUsings>True</OptimizeUsings><EmbraceInRegion>False</EmbraceInRegion><RegionName></RegionName></CSOptimizeUsings><CSReformatCode>True</CSReformatCode><RemoveCodeRedundanciesVB>True</RemoveCodeRedundanciesVB><VBOptimizeImports>True</VBOptimizeImports><VBShortenReferences>True</VBShortenReferences><VBFormatDocComments>True</VBFormatDocComments><FormatAttributeQuoteDescriptor>True</FormatAttributeQuoteDescriptor><XAMLCollapseEmptyTags>False</XAMLCollapseEmptyTags><RemoveCodeRedundancies>True</RemoveCodeRedundancies><CSUseAutoProperty>True</CSUseAutoProperty><CSArrangeQualifiers>True</CSArrangeQualifiers><CSShortenReferences>True</CSShortenReferences><CSMakeFieldReadonly>True</CSMakeFieldReadonly><CSMakeAutoPropertyGetOnly>True</CSMakeAutoPropertyGetOnly><IDEA_SETTINGS>&lt;profile version="1.0"&gt;
&lt;option name="myName" value="DotPulsar: Full Cleanup" /&gt;
&lt;inspection_tool class="ES6ShorthandObjectProperty" enabled="false" level="INFORMATION" enabled_by_default="false" /&gt;
@@ -14,5 +18,36 @@
&lt;inspection_tool class="UnnecessaryReturnJS" enabled="false" level="WARNING" enabled_by_default="false" /&gt;
&lt;/profile&gt;</IDEA_SETTINGS></Profile></s:String>
<s:String x:Key="/Default/CodeStyle/CodeCleanup/SilentCleanupProfile/@EntryValue">DotPulsar: Full Cleanup</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/BRACES_FOR_FOR/@EntryValue">RequiredForMultiline</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/BRACES_FOR_FOREACH/@EntryValue">RequiredForMultiline</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/BRACES_FOR_WHILE/@EntryValue">RequiredForMultiline</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/CONSTRUCTOR_OR_DESTRUCTOR_BODY/@EntryValue">ExpressionBody</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/LOCAL_FUNCTION_BODY/@EntryValue">ExpressionBody</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/METHOD_OR_OPERATOR_BODY/@EntryValue">ExpressionBody</s:String>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/USE_HEURISTICS_FOR_BODY_STYLE/@EntryValue">False</s:Boolean>
+ <s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/BLANK_LINES_BEFORE_MULTILINE_STATEMENTS/@EntryValue">1</s:Int64>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/EMPTY_BLOCK_STYLE/@EntryValue">TOGETHER_SAME_LINE</s:String>
+ <s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_BLANK_LINES_IN_CODE/@EntryValue">1</s:Int64>
+ <s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_BLANK_LINES_IN_DECLARATIONS/@EntryValue">1</s:Int64>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_EMBEDDED_ARRANGEMENT/@EntryValue">False</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_EXPR_MEMBER_ARRANGEMENT/@EntryValue">False</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_INITIALIZER_ARRANGEMENT/@EntryValue">False</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_EXISTING_SWITCH_EXPRESSION_ARRANGEMENT/@EntryValue">False</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/LINE_FEED_AT_FILE_END/@EntryValue">True</s:Boolean>
+ <s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_ARRAY_INITIALIZER_ELEMENTS_ON_LINE/@EntryValue">4</s:Int64>
+ <s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_INITIALIZER_ELEMENTS_ON_LINE/@EntryValue">1</s:Int64>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_ACCESSORHOLDER_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_EXPR_ACCESSOR_ON_SINGLE_LINE/@EntryValue">ALWAYS</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_EXPR_METHOD_ON_SINGLE_LINE/@EntryValue">NEVER</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_EXPR_PROPERTY_ON_SINGLE_LINE/@EntryValue">ALWAYS</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_FIELD_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_CASE_STATEMENT_ON_SAME_LINE/@EntryValue">ALWAYS</s:String>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_EMBEDDED_STATEMENT_ON_SAME_LINE/@EntryValue">NEVER</s:String>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_INITIALIZER_ON_SINGLE_LINE/@EntryValue">False</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_WITHIN_SINGLE_LINE_ARRAY_INITIALIZER_BRACES/@EntryValue">True</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_ARROW_WITH_EXPRESSIONS/@EntryValue">True</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_EXTENDS_COLON/@EntryValue">True</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_FIRST_TYPE_PARAMETER_CONSTRAINT/@EntryValue">True</s:Boolean>
+ <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_OBJECT_AND_COLLECTION_INITIALIZER_STYLE/@EntryValue">CHOP_ALWAYS</s:String>
<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
</wpf:ResourceDictionary>
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 5b58a14..b792652 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -12,20 +12,20 @@
* limitations under the License.
*/
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-using System;
-using System.Buffers;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace Consuming
{
- class Program
+ using System;
+ using System.Buffers;
+ using System.Text;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
+
+ internal class Program
{
- static async Task Main(string[] args)
+ private static async Task Main(string[] args)
{
const string myTopic = "persistent://public/default/mytopic";
@@ -68,10 +68,7 @@
await consumer.Acknowledge(message, cancellationToken).ConfigureAwait(false);
}
}
- catch(OperationCanceledException)
- {
- return;
- }
+ catch (OperationCanceledException) { }
}
private static async Task Monitor(IConsumer consumer)
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index b742d02..b268c6a 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -12,19 +12,19 @@
* limitations under the License.
*/
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-using System;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace Producing
{
- class Program
+ using System;
+ using System.Text;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
+
+ internal class Program
{
- static async Task Main(string[] args)
+ private static async Task Main(string[] args)
{
const string myTopic = "persistent://public/default/mytopic";
@@ -61,15 +61,13 @@
{
while (!cancellationToken.IsCancellationRequested)
{
- var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow.ToString());
+ var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow);
_ = await producer.Send(data, cancellationToken).ConfigureAwait(false);
await Task.Delay(delay).ConfigureAwait(false);
}
}
catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead
- {
- return;
- }
+ { }
}
private static async Task Monitor(IProducer producer)
@@ -84,10 +82,10 @@
var stateMessage = state switch
{
- ProducerState.Connected => $"The producer is connected",
- ProducerState.Disconnected => $"The producer is disconnected",
- ProducerState.Closed => $"The producer has closed",
- ProducerState.Faulted => $"The producer has faulted",
+ ProducerState.Connected => "The producer is connected",
+ ProducerState.Disconnected => "The producer is disconnected",
+ ProducerState.Closed => "The producer has closed",
+ ProducerState.Faulted => "The producer has faulted",
_ => $"The producer has an unknown state '{state}'"
};
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 515cf60..3dbe68f 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -12,20 +12,20 @@
* limitations under the License.
*/
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-using System;
-using System.Buffers;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace Reading
{
- class Program
+ using System;
+ using System.Buffers;
+ using System.Text;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
+
+ internal class Program
{
- static async Task Main(string[] args)
+ private static async Task Main(string[] args)
{
const string myTopic = "persistent://public/default/mytopic";
@@ -36,7 +36,6 @@
.Topic(myTopic)
.Create();
-
var monitoring = Monitor(reader);
var cts = new CancellationTokenSource();
@@ -44,7 +43,7 @@
var reading = ReadMessages(reader, cts.Token);
Console.WriteLine("Press a key to exit");
-
+
_ = Console.ReadKey();
cts.Cancel();
@@ -68,10 +67,7 @@
Console.WriteLine("Received: " + data);
}
}
- catch(OperationCanceledException)
- {
- return;
- }
+ catch (OperationCanceledException) { }
}
private static async Task Monitor(IReader reader)
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index a43112f..4d1c54c 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Collections.Generic;
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A consumer abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IHandleException.cs b/src/DotPulsar/Abstractions/IHandleException.cs
index 17e73ce..f4acbcf 100644
--- a/src/DotPulsar/Abstractions/IHandleException.cs
+++ b/src/DotPulsar/Abstractions/IHandleException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System.Threading.Tasks;
-
namespace DotPulsar.Abstractions
{
+ using System.Threading.Tasks;
+
/// <summary>
/// An exception handling abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
index 0cea4c4..c5812ed 100644
--- a/src/DotPulsar/Abstractions/IMessageBuilder.cs
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A message building abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index d75ca77..dec0362 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A producer abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index 339ec02..ddce2e8 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System;
-
namespace DotPulsar.Abstractions
{
+ using System;
+
/// <summary>
/// A pulsar client abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 37e0e9f..55cd63b 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System;
-using System.Security.Cryptography.X509Certificates;
-using System.Threading.Tasks;
-
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Security.Cryptography.X509Certificates;
+ using System.Threading.Tasks;
+
/// <summary>
/// A pulsar client building abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 7510cbe..d3a04c1 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Threading;
-
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Collections.Generic;
+ using System.Threading;
+
/// <summary>
/// A reader abstraction.
/// </summary>
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Abstractions/IStateChanged.cs
index 46bb5cc..079b9be 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IStateChanged.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Abstractions
{
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A state change monitoring abstraction.
/// </summary>
diff --git a/src/DotPulsar/ExceptionContext.cs b/src/DotPulsar/ExceptionContext.cs
index f2d88fa..c911791 100644
--- a/src/DotPulsar/ExceptionContext.cs
+++ b/src/DotPulsar/ExceptionContext.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System;
-using System.Threading;
-
namespace DotPulsar
{
+ using System;
+ using System.Threading;
+
public sealed class ExceptionContext
{
internal ExceptionContext(Exception exception, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs b/src/DotPulsar/Exceptions/AuthenticationException.cs
index e26ebd6..b9634b2 100644
--- a/src/DotPulsar/Exceptions/AuthenticationException.cs
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System;
-
namespace DotPulsar.Exceptions
{
+ using System;
+
public sealed class AuthenticationException : DotPulsarException
{
public AuthenticationException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerBusyException.cs b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
index 3d5f87c..3f6214e 100644
--- a/src/DotPulsar/Exceptions/ConsumerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
@@ -18,4 +18,4 @@
{
public ConsumerBusyException(string message) : base(message) { }
}
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
index f6079bd..c11206b 100644
--- a/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerDisposedException.cs
@@ -1,8 +1,8 @@
-using DotPulsar.Internal;
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
{
+ using System;
+ using Internal;
+
public sealed class ConsumerDisposedException : ObjectDisposedException
{
public ConsumerDisposedException() : base(typeof(Consumer).FullName) { }
diff --git a/src/DotPulsar/Exceptions/DotPulsarException.cs b/src/DotPulsar/Exceptions/DotPulsarException.cs
index 07e1f4f..158f906 100644
--- a/src/DotPulsar/Exceptions/DotPulsarException.cs
+++ b/src/DotPulsar/Exceptions/DotPulsarException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System;
-
namespace DotPulsar.Exceptions
{
+ using System;
+
public abstract class DotPulsarException : Exception
{
public DotPulsarException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ProducerBusyException.cs b/src/DotPulsar/Exceptions/ProducerBusyException.cs
index 4e1c0f8..094e615 100644
--- a/src/DotPulsar/Exceptions/ProducerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ProducerBusyException.cs
@@ -18,4 +18,4 @@
{
public ProducerBusyException(string message) : base(message) { }
}
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Exceptions/ProducerDisposedException.cs b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
index c881b45..aee0b97 100644
--- a/src/DotPulsar/Exceptions/ProducerDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ProducerDisposedException.cs
@@ -1,8 +1,8 @@
-using DotPulsar.Internal;
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
{
+ using System;
+ using Internal;
+
public sealed class ProducerDisposedException : ObjectDisposedException
{
public ProducerDisposedException() : base(typeof(Producer).FullName) { }
diff --git a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
index f4a4f00..1066730 100644
--- a/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
+++ b/src/DotPulsar/Exceptions/PulsarClientDisposedException.cs
@@ -1,7 +1,7 @@
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
{
+ using System;
+
public sealed class PulsarClientDisposedException : ObjectDisposedException
{
public PulsarClientDisposedException() : base(typeof(PulsarClient).FullName) { }
diff --git a/src/DotPulsar/Exceptions/ReaderDisposedException.cs b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
index ee02457..d95f5c8 100644
--- a/src/DotPulsar/Exceptions/ReaderDisposedException.cs
+++ b/src/DotPulsar/Exceptions/ReaderDisposedException.cs
@@ -1,8 +1,8 @@
-using DotPulsar.Internal;
-using System;
-
-namespace DotPulsar.Exceptions
+namespace DotPulsar.Exceptions
{
+ using System;
+ using Internal;
+
public sealed class ReaderDisposedException : ObjectDisposedException
{
public ReaderDisposedException() : base(typeof(Reader).FullName) { }
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index b45c97f..bf37687 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -12,13 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Internal;
-
namespace DotPulsar.Extensions
{
+ using Abstractions;
+ using Internal;
+
public static class ProducerExtensions
{
- public static IMessageBuilder NewMessage(this IProducer producer) => new MessageBuilder(producer);
+ public static IMessageBuilder NewMessage(this IProducer producer)
+ => new MessageBuilder(producer);
}
}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index abdf6a9..8a8698a 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -12,15 +12,20 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Internal;
-
namespace DotPulsar.Extensions
{
+ using Abstractions;
+ using Internal;
+
public static class PulsarClientExtensions
{
- public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient) => new ProducerBuilder(pulsarClient);
- public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient) => new ConsumerBuilder(pulsarClient);
- public static IReaderBuilder NewReader(this IPulsarClient pulsarClient) => new ReaderBuilder(pulsarClient);
+ public static IProducerBuilder NewProducer(this IPulsarClient pulsarClient)
+ => new ProducerBuilder(pulsarClient);
+
+ public static IConsumerBuilder NewConsumer(this IPulsarClient pulsarClient)
+ => new ConsumerBuilder(pulsarClient);
+
+ public static IReaderBuilder NewReader(this IPulsarClient pulsarClient)
+ => new ReaderBuilder(pulsarClient);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index cce261d..1e6857e 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using PulsarApi;
+
public interface IConnection : IAsyncDisposable
{
ValueTask<bool> HasChannels(CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
index 656d78c..ad313b1 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnectionPool.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IConnectionPool : IAsyncDisposable
{
ValueTask<IConnection> FindConnectionForTopic(string topic, CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index a2cbe34..bbca812 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using PulsarApi;
+
public interface IConsumerChannel : IAsyncDisposable
{
Task Send(CommandAck command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
index b8bbca2..3065119 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannelFactory.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IConsumerChannelFactory
{
Task<IConsumerChannel> Create(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IDequeue.cs b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
index fbda477..f99da40 100644
--- a/src/DotPulsar/Internal/Abstractions/IDequeue.cs
+++ b/src/DotPulsar/Internal/Abstractions/IDequeue.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IDequeue<T>
{
ValueTask<T> Dequeue(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IExecute.cs b/src/DotPulsar/Internal/Abstractions/IExecute.cs
index 8a3336a..1b4ec62 100644
--- a/src/DotPulsar/Internal/Abstractions/IExecute.cs
+++ b/src/DotPulsar/Internal/Abstractions/IExecute.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IExecute
{
ValueTask Execute(Action action, CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IProcess.cs b/src/DotPulsar/Internal/Abstractions/IProcess.cs
index 9b3e0e3..4c48c39 100644
--- a/src/DotPulsar/Internal/Abstractions/IProcess.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProcess.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+
public interface IProcess : IAsyncDisposable
{
Guid CorrelationId { get; }
@@ -23,4 +23,4 @@
void Start();
void Handle(IEvent @event);
}
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index f15fca2..43a97a9 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -12,17 +12,17 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using PulsarApi;
+
public interface IProducerChannel : IAsyncDisposable
{
Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
- Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+ Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
}
-}
\ No newline at end of file
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
index 4f6fefb..2f84785 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IProducerChannelFactory
{
Task<IProducerChannel> Create(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs b/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs
index d032675..0ce67ef 100644
--- a/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs
+++ b/src/DotPulsar/Internal/Abstractions/IPulsarStream.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IPulsarStream : IAsyncDisposable
{
Task Send(ReadOnlySequence<byte> sequence);
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index b2ef7b0..64c2289 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IReaderChannel : IAsyncDisposable
{
ValueTask<Message> Receive(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
index 46e83b4..c651dbf 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal.Abstractions
{
+ using System.Threading;
+ using System.Threading.Tasks;
+
public interface IReaderChannelFactory
{
Task<IReaderChannel> Create(CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Internal/Abstractions/IStateManager.cs b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
index 0226f7a..83f5e19 100644
--- a/src/DotPulsar/Internal/Abstractions/IStateManager.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Abstractions;
+
public interface IStateManager<TState> : IStateChanged<TState> where TState : notnull
{
TState CurrentState { get; }
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index bd2d8c4..cad8312 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Collections.Generic;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Exceptions;
+
public sealed class AsyncLock : IAsyncDisposable
{
private readonly LinkedList<CancelableCompletionSource<IDisposable>> _pending;
@@ -33,7 +33,7 @@
_pending = new LinkedList<CancelableCompletionSource<IDisposable>>();
_semaphoreSlim = new SemaphoreSlim(1, 1);
_releaser = new Releaser(Release);
- _completedTask = Task.FromResult((IDisposable)_releaser);
+ _completedTask = Task.FromResult((IDisposable) _releaser);
}
public Task<IDisposable> Lock(CancellationToken cancellationToken)
@@ -68,9 +68,7 @@
return;
foreach (var pending in _pending)
- {
pending.Dispose();
- }
_pending.Clear();
}
@@ -124,9 +122,11 @@
{
private readonly Action _release;
- public Releaser(Action release) => _release = release;
+ public Releaser(Action release)
+ => _release = release;
- public void Dispose() => _release();
+ public void Dispose()
+ => _release();
}
}
}
diff --git a/src/DotPulsar/Internal/AsyncLockExecutor.cs b/src/DotPulsar/Internal/AsyncLockExecutor.cs
index 98d2663..9258784 100644
--- a/src/DotPulsar/Internal/AsyncLockExecutor.cs
+++ b/src/DotPulsar/Internal/AsyncLockExecutor.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+
public sealed class AsyncLockExecutor : IExecute, IAsyncDisposable
{
private readonly AsyncLock _lock;
@@ -30,7 +30,8 @@
_executor = executor;
}
- public async ValueTask DisposeAsync() => await _lock.DisposeAsync().ConfigureAwait(false);
+ public async ValueTask DisposeAsync()
+ => await _lock.DisposeAsync().ConfigureAwait(false);
public async ValueTask Execute(Action action, CancellationToken cancellationToken)
{
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs
index 70519de..5d9286e 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Collections.Generic;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Exceptions;
+
public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable
{
private readonly object _lock;
@@ -48,7 +48,9 @@
tcs.Value.SetResult(item);
}
else
+ {
_queue.Enqueue(item);
+ }
}
}
diff --git a/src/DotPulsar/Internal/Awaitor.cs b/src/DotPulsar/Internal/Awaitor.cs
index 62cd3e8..130e467 100644
--- a/src/DotPulsar/Internal/Awaitor.cs
+++ b/src/DotPulsar/Internal/Awaitor.cs
@@ -12,26 +12,27 @@
* limitations under the License.
*/
-using System;
-using System.Collections.Concurrent;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
- public sealed class Awaitor<T, Result> : IDisposable
+ using System;
+ using System.Collections.Concurrent;
+ using System.Threading.Tasks;
+
+ public sealed class Awaitor<T, TResult> : IDisposable
{
- private readonly ConcurrentDictionary<T, TaskCompletionSource<Result>> _items;
+ private readonly ConcurrentDictionary<T, TaskCompletionSource<TResult>> _items;
- public Awaitor() => _items = new ConcurrentDictionary<T, TaskCompletionSource<Result>>();
+ public Awaitor()
+ => _items = new ConcurrentDictionary<T, TaskCompletionSource<TResult>>();
- public Task<Result> CreateTask(T item)
+ public Task<TResult> CreateTask(T item)
{
- var tcs = new TaskCompletionSource<Result>(TaskCreationOptions.RunContinuationsAsynchronously);
+ var tcs = new TaskCompletionSource<TResult>(TaskCreationOptions.RunContinuationsAsynchronously);
_ = _items.TryAdd(item, tcs);
return tcs.Task;
}
- public void SetResult(T item, Result result)
+ public void SetResult(T item, TResult result)
{
if (_items.TryRemove(item, out var tcs))
tcs.SetResult(result);
@@ -40,9 +41,7 @@
public void Dispose()
{
foreach (var item in _items.Values)
- {
item.SetCanceled();
- }
_items.Clear();
}
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index c8e8c58..9aaff16 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System.Buffers;
-using System.Collections;
-using System.Collections.Generic;
-
namespace DotPulsar.Internal
{
+ using System.Buffers;
+ using System.Collections;
+ using System.Collections.Generic;
+ using Extensions;
+ using PulsarApi;
+
public sealed class BatchHandler
{
private readonly bool _trackBatches;
@@ -33,12 +33,13 @@
_batches = new LinkedList<Batch>();
}
- public Message Add(MessageIdData messageId, PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data)
+ public Message Add(MessageIdData messageId, MessageMetadata metadata, ReadOnlySequence<byte> data)
{
if (_trackBatches)
_batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
long index = 0;
+
for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
{
var singleMetadataSize = data.ReadUInt32(index, true);
@@ -48,13 +49,14 @@
var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
var message = new Message(singleMessageId, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
_messages.Enqueue(message);
- index += (uint)singleMetadata.PayloadSize;
+ index += (uint) singleMetadata.PayloadSize;
}
return _messages.Dequeue();
}
- public Message? GetNext() => _messages.Count == 0 ? null : _messages.Dequeue();
+ public Message? GetNext()
+ => _messages.Count == 0 ? null : _messages.Dequeue();
public void Clear()
{
@@ -72,11 +74,13 @@
continue;
batch.Acknowledge(messageId.BatchIndex);
+
if (batch.IsAcknowledged())
{
_batches.Remove(batch);
return batch.MessageId;
}
+
break;
}
@@ -95,7 +99,8 @@
public MessageIdData MessageId { get; }
- public void Acknowledge(int batchIndex) => _acknowledgementIndex.Set(batchIndex, true);
+ public void Acknowledge(int batchIndex)
+ => _acknowledgementIndex.Set(batchIndex, true);
public bool IsAcknowledged()
{
diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs b/src/DotPulsar/Internal/CancelableCompletionSource.cs
index ddc3428..c099aa3 100644
--- a/src/DotPulsar/Internal/CancelableCompletionSource.cs
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -12,24 +12,28 @@
* limitations under the License.
*/
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
public sealed class CancelableCompletionSource<T> : IDisposable
{
private readonly TaskCompletionSource<T> _source;
private CancellationTokenRegistration? _registration;
- public CancelableCompletionSource() => _source = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
+ public CancelableCompletionSource()
+ => _source = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
- public void SetupCancellation(Action callback, CancellationToken token) => _registration = token.Register(() => callback());
+ public void SetupCancellation(Action callback, CancellationToken token)
+ => _registration = token.Register(() => callback());
- public void SetResult(T result) => _ = _source.TrySetResult(result);
+ public void SetResult(T result)
+ => _ = _source.TrySetResult(result);
- public void SetException(Exception exception) => _ = _source.TrySetException(exception);
+ public void SetException(Exception exception)
+ => _ = _source.TrySetException(exception);
public Task<T> Task => _source.Task;
diff --git a/src/DotPulsar/Internal/Channel.cs b/src/DotPulsar/Internal/Channel.cs
index 2d5d73e..9fe0819 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-
namespace DotPulsar.Internal
{
+ using System;
+ using Abstractions;
+ using Events;
+
public sealed class Channel : IChannel
{
private readonly Guid _correlationId;
@@ -31,13 +31,28 @@
_enqueue = enqueue;
}
- public void Received(MessagePackage message) => _enqueue.Enqueue(message);
- public void Activated() => _eventRegister.Register(new ChannelActivated(_correlationId));
- public void ClosedByServer() => _eventRegister.Register(new ChannelClosedByServer(_correlationId));
- public void Connected() => _eventRegister.Register(new ChannelConnected(_correlationId));
- public void Deactivated() => _eventRegister.Register(new ChannelDeactivated(_correlationId));
- public void Disconnected() => _eventRegister.Register(new ChannelDisconnected(_correlationId));
- public void ReachedEndOfTopic() => _eventRegister.Register(new ChannelReachedEndOfTopic(_correlationId));
- public void Unsubscribed() => _eventRegister.Register(new ChannelUnsubscribed(_correlationId));
+ public void Received(MessagePackage message)
+ => _enqueue.Enqueue(message);
+
+ public void Activated()
+ => _eventRegister.Register(new ChannelActivated(_correlationId));
+
+ public void ClosedByServer()
+ => _eventRegister.Register(new ChannelClosedByServer(_correlationId));
+
+ public void Connected()
+ => _eventRegister.Register(new ChannelConnected(_correlationId));
+
+ public void Deactivated()
+ => _eventRegister.Register(new ChannelDeactivated(_correlationId));
+
+ public void Disconnected()
+ => _eventRegister.Register(new ChannelDisconnected(_correlationId));
+
+ public void ReachedEndOfTopic()
+ => _eventRegister.Register(new ChannelReachedEndOfTopic(_correlationId));
+
+ public void Unsubscribed()
+ => _eventRegister.Register(new ChannelUnsubscribed(_correlationId));
}
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index d03e99d..b0d6b27 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Extensions;
+ using PulsarApi;
+
public sealed class ChannelManager : IDisposable
{
private readonly IdLookup<IChannel> _consumerChannels;
@@ -32,12 +32,14 @@
_producerChannels = new IdLookup<IChannel>();
}
- public bool HasChannels() => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
+ public bool HasChannels()
+ => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
public Task<ProducerResponse> Outgoing(CommandProducer command, Task<BaseCommand> response, IChannel channel)
{
var producerId = _producerChannels.Add(channel);
command.ProducerId = producerId;
+
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
@@ -45,6 +47,7 @@
_producerChannels.Remove(producerId);
result.Result.Error.Throw();
}
+
channel.Connected();
return new ProducerResponse(producerId, result.Result.ProducerSuccess.ProducerName);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
@@ -54,6 +57,7 @@
{
var consumerId = _consumerChannels.Add(channel);
command.ConsumerId = consumerId;
+
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
@@ -61,6 +65,7 @@
_consumerChannels.Remove(consumerId);
result.Result.Error.Throw();
}
+
channel.Connected();
return new SubscribeResponse(consumerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
@@ -97,6 +102,7 @@
if (result.Result.CommandType == BaseCommand.Type.Success)
{
var channel = _consumerChannels.Remove(consumerId);
+
if (channel != null)
channel.Unsubscribed();
}
@@ -106,6 +112,7 @@
public void Incoming(CommandCloseConsumer command)
{
var channel = _consumerChannels.Remove(command.ConsumerId);
+
if (channel != null)
channel.ClosedByServer();
}
@@ -113,6 +120,7 @@
public void Incoming(CommandCloseProducer command)
{
var inbox = _producerChannels.Remove(command.ProducerId);
+
if (inbox != null)
inbox.ClosedByServer();
}
@@ -120,6 +128,7 @@
public void Incoming(CommandActiveConsumerChange command)
{
var channel = _consumerChannels[command.ConsumerId];
+
if (channel is null)
return;
@@ -132,6 +141,7 @@
public void Incoming(CommandReachedEndOfTopic command)
{
var channel = _consumerChannels[command.ConsumerId];
+
if (channel != null)
channel.ReachedEndOfTopic();
}
@@ -139,6 +149,7 @@
public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
{
var consumer = _consumerChannels[command.ConsumerId];
+
if (consumer != null)
consumer.Received(new MessagePackage(command.MessageId, data));
}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index b5a2372..6446817 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Exceptions;
+ using Extensions;
+ using PulsarApi;
+
public sealed class Connection : IConnection
{
private readonly AsyncLock _lock;
@@ -85,17 +85,17 @@
return await responseTask.ConfigureAwait(false);
}
- public async Task Send(CommandPing command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task Send(CommandPing command, CancellationToken cancellationToken)
+ => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
- public async Task Send(CommandPong command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task Send(CommandPong command, CancellationToken cancellationToken)
+ => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
- public async Task Send(CommandAck command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task Send(CommandAck command, CancellationToken cancellationToken)
+ => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
- public async Task Send(CommandFlow command, CancellationToken cancellationToken) =>
- await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task Send(CommandFlow command, CancellationToken cancellationToken)
+ => await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
@@ -115,17 +115,17 @@
return await responseTask.ConfigureAwait(false);
}
- public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken)
+ => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
- public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken)
+ => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
- public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken)
+ => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
- public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) =>
- await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
+ public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ => await SendRequestResponse(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
{
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index 106d2cc..e5dce0a 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -12,17 +12,17 @@
* limitations under the License.
*/
-using System;
-using System.IO;
-using System.Net;
-using System.Net.Security;
-using System.Net.Sockets;
-using System.Security.Authentication;
-using System.Security.Cryptography.X509Certificates;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.IO;
+ using System.Net;
+ using System.Net.Security;
+ using System.Net.Sockets;
+ using System.Security.Authentication;
+ using System.Security.Cryptography.X509Certificates;
+ using System.Threading.Tasks;
+
public sealed class Connector
{
private readonly X509Certificate2Collection _clientCertificates;
@@ -30,7 +30,8 @@
private readonly bool _verifyCertificateAuthority;
private readonly bool _verifyCertificateName;
- public Connector(X509Certificate2Collection clientCertificates, X509Certificate2? trustedCertificateAuthority, bool verifyCertificateAuthority, bool verifyCertificateName)
+ public Connector(X509Certificate2Collection clientCertificates, X509Certificate2? trustedCertificateAuthority, bool verifyCertificateAuthority,
+ bool verifyCertificateName)
{
_clientCertificates = clientCertificates;
_trustedCertificateAuthority = trustedCertificateAuthority;
@@ -84,7 +85,7 @@
try
{
- sslStream = new SslStream(stream, false, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
+ sslStream = new SslStream(stream, false, ValidateServerCertificate, null);
await sslStream.AuthenticateAsClientAsync(host, _clientCertificates, SslProtocols.None, true).ConfigureAwait(false);
return sslStream;
}
@@ -116,7 +117,8 @@
return false;
chain.ChainPolicy.ExtraStore.Add(_trustedCertificateAuthority);
- _ = chain.Build((X509Certificate2)certificate);
+ _ = chain.Build((X509Certificate2) certificate);
+
for (var i = 0; i < chain.ChainElements.Count; i++)
{
if (chain.ChainElements[i].Certificate.Thumbprint == _trustedCertificateAuthority.Thumbprint)
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index c1d5f1c..6e6a51c 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System.Reflection;
-
namespace DotPulsar.Internal
{
+ using System.Reflection;
+
public static class Constants
{
static Constants()
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index d239d83..5f0e763 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -12,19 +12,19 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Collections.Generic;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Collections.Generic;
+ using System.Runtime.CompilerServices;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using Events;
+ using PulsarApi;
+
public sealed class Consumer : IConsumer
{
private readonly Guid _correlationId;
@@ -59,9 +59,11 @@
public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
=> await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
- public bool IsFinalState() => _state.IsFinalState();
+ public bool IsFinalState()
+ => _state.IsFinalState();
- public bool IsFinalState(ConsumerState state) => _state.IsFinalState(state);
+ public bool IsFinalState(ConsumerState state)
+ => _state.IsFinalState(state);
public async ValueTask DisposeAsync()
{
@@ -77,9 +79,7 @@
ThrowIfDisposed();
while (!cancellationToken.IsCancellationRequested)
- {
yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
- }
}
public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
@@ -105,19 +105,21 @@
ThrowIfDisposed();
var seek = new CommandSeek { MessageId = messageId.Data };
_ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
- return;
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken).ConfigureAwait(false);
+
+ var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken)
+ .ConfigureAwait(false);
return new MessageId(response.LastMessageId);
}
private async ValueTask Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
{
ThrowIfDisposed();
+
await _executor.Execute(() =>
{
_cachedCommandAck.Type = ackType;
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 64b6c7a..15b2875 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal
{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+
public sealed class ConsumerBuilder : IConsumerBuilder
{
private readonly IPulsarClient _pulsarClient;
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 9a05dce..44cfd94 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Extensions;
+ using PulsarApi;
+
public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
{
private readonly ulong _id;
@@ -57,6 +57,7 @@
_sendWhenZero--;
var message = _batchHandler.GetNext();
+
if (message != null)
return message;
@@ -82,9 +83,11 @@
public async Task Send(CommandAck command, CancellationToken cancellationToken)
{
var messageId = command.MessageIds[0];
+
if (messageId.BatchIndex != -1)
{
var batchMessageId = _batchHandler.Acknowledge(messageId);
+
if (batchMessageId is null)
return;
@@ -140,7 +143,7 @@
if (_firstFlow)
{
- _cachedCommandFlow.MessagePermits = (uint)Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
+ _cachedCommandFlow.MessagePermits = (uint) Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
_firstFlow = false;
}
@@ -149,11 +152,7 @@
private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken)
{
- var ack = new CommandAck
- {
- Type = CommandAck.AckType.Individual,
- validation_error = CommandAck.ValidationError.ChecksumMismatch
- };
+ var ack = new CommandAck { Type = CommandAck.AckType.Individual, validation_error = CommandAck.ValidationError.ChecksumMismatch };
ack.MessageIds.Add(messagePackage.MessageId);
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index aa07a40..183c0e3 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using PulsarApi;
+
public sealed class ConsumerChannelFactory : IConsumerChannelFactory
{
private readonly Guid _correlationId;
@@ -46,12 +46,12 @@
_subscribe = new CommandSubscribe
{
ConsumerName = options.ConsumerName,
- initialPosition = (CommandSubscribe.InitialPosition)options.InitialPosition,
+ initialPosition = (CommandSubscribe.InitialPosition) options.InitialPosition,
PriorityLevel = options.PriorityLevel,
ReadCompacted = options.ReadCompacted,
Subscription = options.SubscriptionName,
Topic = options.Topic,
- Type = (CommandSubscribe.SubType)options.SubscriptionType
+ Type = (CommandSubscribe.SubType) options.SubscriptionType
};
_batchHandler = new BatchHandler(true);
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs b/src/DotPulsar/Internal/ConsumerProcess.cs
index af75d0f..7db9994 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading.Tasks;
+ using Abstractions;
+
public sealed class ConsumerProcess : Process
{
private readonly IStateManager<ConsumerState> _stateManager;
@@ -38,7 +38,7 @@
_isFailoverSubscription = isFailoverSubscription;
}
- public async override ValueTask DisposeAsync()
+ public override async ValueTask DisposeAsync()
{
_stateManager.SetState(ConsumerState.Closed);
CancellationTokenSource.Cancel();
diff --git a/src/DotPulsar/Internal/Crc32C.cs b/src/DotPulsar/Internal/Crc32C.cs
index b348189..2dfac40 100644
--- a/src/DotPulsar/Internal/Crc32C.cs
+++ b/src/DotPulsar/Internal/Crc32C.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System.Buffers;
-
namespace DotPulsar.Internal
{
+ using System.Buffers;
+
public static class Crc32C
{
private const uint Generator = 0x82F63B78u;
@@ -29,13 +29,13 @@
for (uint i = 0; i < 256; i++)
{
var entry = i;
+
for (var j = 0; j < 16; j++)
{
for (var k = 0; k < 8; k++)
- {
- entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : (entry >> 1);
- }
- Lookup[(j * 256) + i] = entry;
+ entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : entry >> 1;
+
+ Lookup[j * 256 + i] = entry;
}
}
}
@@ -51,20 +51,21 @@
foreach (var memory in sequence)
{
var span = memory.Span;
+
for (var i = 0; i < span.Length; ++i)
{
var currentByte = span[i];
if (!readingBlock)
{
- checksum = Lookup[(byte)(checksum ^ currentByte)] ^ checksum >> 8;
+ checksum = Lookup[(byte) (checksum ^ currentByte)] ^ (checksum >> 8);
continue;
}
var offSetBase = offset * 256;
if (offset > 11)
- block[offset] = Lookup[offSetBase + ((byte)(checksum >> (8 * (15 - offset))) ^ currentByte)];
+ block[offset] = Lookup[offSetBase + ((byte) (checksum >> (8 * (15 - offset))) ^ currentByte)];
else
block[offset] = Lookup[offSetBase + currentByte];
@@ -75,6 +76,7 @@
offset = 15;
readingBlock = remaningBytes >= 16;
checksum = 0;
+
for (var j = 0; j < block.Length; ++j)
checksum ^= block[j];
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 4c8b4f0..855657c 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -12,25 +12,27 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Net.Sockets;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using Exceptions;
+
public sealed class DefaultExceptionHandler : IHandleException
{
private readonly TimeSpan _retryInterval;
- public DefaultExceptionHandler(TimeSpan retryInterval) => _retryInterval = retryInterval;
+ public DefaultExceptionHandler(TimeSpan retryInterval)
+ => _retryInterval = retryInterval;
public async ValueTask OnException(ExceptionContext exceptionContext)
{
exceptionContext.Result = DetermineFaultAction(exceptionContext.Exception, exceptionContext.CancellationToken);
+
if (exceptionContext.Result == FaultAction.Retry)
await Task.Delay(_retryInterval, exceptionContext.CancellationToken).ConfigureAwait(false);
@@ -41,15 +43,24 @@
{
switch (exception)
{
- case TooManyRequestsException _: return FaultAction.Retry;
- case ChannelNotReadyException _: return FaultAction.Retry;
- case ServiceNotReadyException _: return FaultAction.Retry;
- case ConnectionDisposedException _: return FaultAction.Retry;
- case AsyncLockDisposedException _: return FaultAction.Retry;
- case PulsarStreamDisposedException _: return FaultAction.Retry;
- case AsyncQueueDisposedException _: return FaultAction.Retry;
- case OperationCanceledException _: return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry;
- case DotPulsarException _: return FaultAction.Rethrow;
+ case TooManyRequestsException _:
+ return FaultAction.Retry;
+ case ChannelNotReadyException _:
+ return FaultAction.Retry;
+ case ServiceNotReadyException _:
+ return FaultAction.Retry;
+ case ConnectionDisposedException _:
+ return FaultAction.Retry;
+ case AsyncLockDisposedException _:
+ return FaultAction.Retry;
+ case PulsarStreamDisposedException _:
+ return FaultAction.Retry;
+ case AsyncQueueDisposedException _:
+ return FaultAction.Retry;
+ case OperationCanceledException _:
+ return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry;
+ case DotPulsarException _:
+ return FaultAction.Rethrow;
case SocketException socketException:
switch (socketException.SocketErrorCode)
{
@@ -58,6 +69,7 @@
case SocketError.NetworkUnreachable:
return FaultAction.Rethrow;
}
+
return FaultAction.Retry;
}
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index b9f2e95..3042270 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System.Diagnostics.Tracing;
-using System.Threading;
-
namespace DotPulsar.Internal
{
#if NETSTANDARD2_1
+ using System.Diagnostics.Tracing;
+ using System.Threading;
+
public sealed class DotPulsarEventSource : EventSource
{
private readonly PollingCounter _totalClientsCounter;
@@ -164,16 +164,25 @@
public sealed class DotPulsarEventSource
{
public static readonly DotPulsarEventSource Log = new DotPulsarEventSource();
- public DotPulsarEventSource() { }
+
public void ClientCreated() { }
+
public void ClientDisposed() { }
+
public void ConnectionCreated() { }
+
public void ConnectionDisposed() { }
+
public void ConsumerCreated() { }
+
public void ConsumerDisposed() { }
+
public void ProducerCreated() { }
+
public void ProducerDisposed() { }
+
public void ReaderCreated() { }
+
public void ReaderDisposed() { }
}
#endif
diff --git a/src/DotPulsar/Internal/Events/ChannelActivated.cs b/src/DotPulsar/Internal/Events/ChannelActivated.cs
index 1f8df0a..d5d4d44 100644
--- a/src/DotPulsar/Internal/Events/ChannelActivated.cs
+++ b/src/DotPulsar/Internal/Events/ChannelActivated.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelActivated : IEvent
{
- public ChannelActivated(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelActivated(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs b/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs
index 6b64a53..7c14bbf 100644
--- a/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs
+++ b/src/DotPulsar/Internal/Events/ChannelClosedByServer.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelClosedByServer : IEvent
{
- public ChannelClosedByServer(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelClosedByServer(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ChannelConnected.cs b/src/DotPulsar/Internal/Events/ChannelConnected.cs
index 63b1b36..83cc366 100644
--- a/src/DotPulsar/Internal/Events/ChannelConnected.cs
+++ b/src/DotPulsar/Internal/Events/ChannelConnected.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelConnected : IEvent
{
- public ChannelConnected(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelConnected(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ChannelDeactivated.cs b/src/DotPulsar/Internal/Events/ChannelDeactivated.cs
index 0ab9a0d..fa47c19 100644
--- a/src/DotPulsar/Internal/Events/ChannelDeactivated.cs
+++ b/src/DotPulsar/Internal/Events/ChannelDeactivated.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelDeactivated : IEvent
{
- public ChannelDeactivated(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelDeactivated(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ChannelDisconnected.cs b/src/DotPulsar/Internal/Events/ChannelDisconnected.cs
index 09972bd..a3b223e 100644
--- a/src/DotPulsar/Internal/Events/ChannelDisconnected.cs
+++ b/src/DotPulsar/Internal/Events/ChannelDisconnected.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelDisconnected : IEvent
{
- public ChannelDisconnected(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelDisconnected(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs b/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs
index 6edfa94..7f16ec5 100644
--- a/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs
+++ b/src/DotPulsar/Internal/Events/ChannelReachedEndOfTopic.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelReachedEndOfTopic : IEvent
{
- public ChannelReachedEndOfTopic(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelReachedEndOfTopic(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs b/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs
index c53e716..eae9ef4 100644
--- a/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs
+++ b/src/DotPulsar/Internal/Events/ChannelUnsubscribed.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ChannelUnsubscribed : IEvent
{
- public ChannelUnsubscribed(Guid correlationId) => CorrelationId = correlationId;
+ public ChannelUnsubscribed(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ConsumerCreated.cs b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
index db1b88b..0add8f0 100644
--- a/src/DotPulsar/Internal/Events/ConsumerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerCreated.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ConsumerCreated : IEvent
{
public ConsumerCreated(Guid correlationId, Consumer consumer)
diff --git a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
index 074e1db..be05a6f 100644
--- a/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ConsumerDisposed.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ConsumerDisposed : IEvent
{
public ConsumerDisposed(Guid correlationId, Consumer consumer)
diff --git a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
index 8fc30ef..2b4c356 100644
--- a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
+++ b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ExecutorFaulted : IEvent
{
- public ExecutorFaulted(Guid correlationId) => CorrelationId = correlationId;
+ public ExecutorFaulted(Guid correlationId)
+ => CorrelationId = correlationId;
public Guid CorrelationId { get; }
}
diff --git a/src/DotPulsar/Internal/Events/ProducerCreated.cs b/src/DotPulsar/Internal/Events/ProducerCreated.cs
index 679d7fc..8df093e 100644
--- a/src/DotPulsar/Internal/Events/ProducerCreated.cs
+++ b/src/DotPulsar/Internal/Events/ProducerCreated.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ProducerCreated : IEvent
{
public ProducerCreated(Guid correlationId, Producer producer)
diff --git a/src/DotPulsar/Internal/Events/ProducerDisposed.cs b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
index a348086..7de116e 100644
--- a/src/DotPulsar/Internal/Events/ProducerDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ProducerDisposed.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ProducerDisposed : IEvent
{
public ProducerDisposed(Guid correlationId, Producer producer)
diff --git a/src/DotPulsar/Internal/Events/ReaderCreated.cs b/src/DotPulsar/Internal/Events/ReaderCreated.cs
index 7a354fa..beb0750 100644
--- a/src/DotPulsar/Internal/Events/ReaderCreated.cs
+++ b/src/DotPulsar/Internal/Events/ReaderCreated.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ReaderCreated : IEvent
{
public ReaderCreated(Guid correlationId, Reader reader)
diff --git a/src/DotPulsar/Internal/Events/ReaderDisposed.cs b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
index a8647d2..ba1cb11 100644
--- a/src/DotPulsar/Internal/Events/ReaderDisposed.cs
+++ b/src/DotPulsar/Internal/Events/ReaderDisposed.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-
namespace DotPulsar.Internal.Events
{
+ using System;
+ using Abstractions;
+
public sealed class ReaderDisposed : IEvent
{
public ReaderDisposed(Guid correlationId, Reader reader)
diff --git a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
index ebf164c..fd5f619 100644
--- a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
+++ b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
@@ -12,24 +12,26 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading.Tasks;
+ using DotPulsar.Abstractions;
+
public sealed class ExceptionHandlerPipeline : IHandleException
{
private readonly IHandleException[] _handlers;
- public ExceptionHandlerPipeline(IEnumerable<IHandleException> handlers) => _handlers = handlers.ToArray();
+ public ExceptionHandlerPipeline(IEnumerable<IHandleException> handlers)
+ => _handlers = handlers.ToArray();
public async ValueTask OnException(ExceptionContext exceptionContext)
{
foreach (var handler in _handlers)
{
await handler.OnException(exceptionContext).ConfigureAwait(false);
+
if (exceptionContext.ExceptionHandled)
break;
}
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
index b3dbbdd..0efb519 100644
--- a/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/AsyncLockDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
namespace DotPulsar.Internal.Exceptions
{
+ using System;
+
public sealed class AsyncLockDisposedException : ObjectDisposedException
{
public AsyncLockDisposedException() : base(typeof(AsyncLock).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
index 27156cd..ea5f5fd 100644
--- a/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/AsyncQueueDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
namespace DotPulsar.Internal.Exceptions
{
+ using System;
+
public sealed class AsyncQueueDisposedException : ObjectDisposedException
{
public AsyncQueueDisposedException() : base(typeof(AsyncQueue<>).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
index 649420d..64178cd 100644
--- a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal.Exceptions
{
+ using DotPulsar.Exceptions;
+
public sealed class ChannelNotReadyException : DotPulsarException
{
public ChannelNotReadyException() : base("The service is not ready yet") { }
diff --git a/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
index e445908..7b0b1d4 100644
--- a/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ConnectionDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
namespace DotPulsar.Internal.Exceptions
{
+ using System;
+
public sealed class ConnectionDisposedException : ObjectDisposedException
{
public ConnectionDisposedException() : base(typeof(Connection).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
index f34323f..ade8ecd 100644
--- a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal.Exceptions
{
+ using DotPulsar.Exceptions;
+
public sealed class ConsumerNotFoundException : DotPulsarException
{
public ConsumerNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
index 8bd4567..b4ec0a0 100644
--- a/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
+++ b/src/DotPulsar/Internal/Exceptions/PulsarStreamDisposedException.cs
@@ -1,7 +1,21 @@
-using System;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
namespace DotPulsar.Internal.Exceptions
{
+ using System;
+
public sealed class PulsarStreamDisposedException : ObjectDisposedException
{
public PulsarStreamDisposedException() : base(typeof(PulsarStream).FullName) { }
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
index 0e5e313..d7f2975 100644
--- a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal.Exceptions
{
+ using DotPulsar.Exceptions;
+
public sealed class ServiceNotReadyException : DotPulsarException
{
public ServiceNotReadyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
index b2949a5..80c91c2 100644
--- a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal.Exceptions
{
+ using DotPulsar.Exceptions;
+
public sealed class TooManyRequestsException : DotPulsarException
{
public TooManyRequestsException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
index b72df3b..e8a1f98 100644
--- a/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
+++ b/src/DotPulsar/Internal/Exceptions/UnexpectedResponseException.cs
@@ -12,14 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Exceptions;
-using System;
-
namespace DotPulsar.Internal.Exceptions
{
+ using System;
+ using DotPulsar.Exceptions;
+
public sealed class UnexpectedResponseException : DotPulsarException
{
public UnexpectedResponseException(string message) : base(message) { }
+
public UnexpectedResponseException(string message, Exception innerException) : base(message, innerException) { }
}
}
diff --git a/src/DotPulsar/Internal/Executor.cs b/src/DotPulsar/Internal/Executor.cs
index f5ba80b..6463ce5 100644
--- a/src/DotPulsar/Internal/Executor.cs
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using DotPulsar.Abstractions;
+ using Events;
+
public sealed class Executor : IExecute
{
private readonly Guid _correlationId;
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 4c80d2e..8cf27ab 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.PulsarApi;
-
namespace DotPulsar.Internal.Extensions
{
+ using DotPulsar.Exceptions;
+ using Exceptions;
+ using PulsarApi;
+
public static class CommandExtensions
{
public static void Expect(this BaseCommand command, BaseCommand.Type type)
@@ -38,165 +38,137 @@
throw new UnexpectedResponseException($"Expected '{type}' but got '{command.CommandType}'");
}
- public static void Throw(this CommandSendError command) => Throw(command.Error, command.Message);
+ public static void Throw(this CommandSendError command)
+ => Throw(command.Error, command.Message);
- public static void Throw(this CommandLookupTopicResponse command) => Throw(command.Error, command.Message);
+ public static void Throw(this CommandLookupTopicResponse command)
+ => Throw(command.Error, command.Message);
- public static void Throw(this CommandError error) => Throw(error.Error, error.Message);
+ public static void Throw(this CommandError error)
+ => Throw(error.Error, error.Message);
private static void Throw(ServerError error, string message)
- {
- switch (error)
+ => throw (error switch
{
- case ServerError.AuthenticationError: throw new AuthenticationException(message);
- case ServerError.AuthorizationError: throw new AuthorizationException(message);
- case ServerError.ChecksumError: throw new ChecksumException(message);
- case ServerError.ConsumerAssignError: throw new ConsumerAssignException(message);
- case ServerError.ConsumerBusy: throw new ConsumerBusyException(message);
- case ServerError.ConsumerNotFound: throw new ConsumerNotFoundException(message);
- case ServerError.IncompatibleSchema: throw new IncompatibleSchemaException(message);
- case ServerError.InvalidTopicName: throw new InvalidTopicNameException(message);
- case ServerError.MetadataError: throw new MetadataException(message);
- case ServerError.PersistenceError: throw new PersistenceException(message);
- case ServerError.ProducerBlockedQuotaExceededError:
- case ServerError.ProducerBlockedQuotaExceededException:
- throw new ProducerBlockedQuotaExceededException(message + ". Error code: " + error);
- case ServerError.ProducerBusy: throw new ProducerBusyException(message);
- case ServerError.ServiceNotReady: throw new ServiceNotReadyException(message);
- case ServerError.SubscriptionNotFound: throw new SubscriptionNotFoundException(message);
- case ServerError.TooManyRequests: throw new TooManyRequestsException(message);
- case ServerError.TopicNotFound: throw new TopicNotFoundException(message);
- case ServerError.TopicTerminatedError: throw new TopicTerminatedException(message);
- case ServerError.UnsupportedVersionError: throw new UnsupportedVersionException(message);
- case ServerError.UnknownError:
- default: throw new UnknownException(message + ". Error code: " + error);
- }
- }
+ ServerError.AuthenticationError => new AuthenticationException(message),
+ ServerError.AuthorizationError => new AuthorizationException(message),
+ ServerError.ChecksumError => new ChecksumException(message),
+ ServerError.ConsumerAssignError => new ConsumerAssignException(message),
+ ServerError.ConsumerBusy => new ConsumerBusyException(message),
+ ServerError.ConsumerNotFound => new ConsumerNotFoundException(message),
+ ServerError.IncompatibleSchema => new IncompatibleSchemaException(message),
+ ServerError.InvalidTopicName => new InvalidTopicNameException(message),
+ ServerError.MetadataError => new MetadataException(message),
+ ServerError.PersistenceError => new PersistenceException(message),
+ ServerError.ProducerBlockedQuotaExceededError => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
+ ServerError.ProducerBlockedQuotaExceededException => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
+ ServerError.ProducerBusy => new ProducerBusyException(message),
+ ServerError.ServiceNotReady => new ServiceNotReadyException(message),
+ ServerError.SubscriptionNotFound => new SubscriptionNotFoundException(message),
+ ServerError.TooManyRequests => new TooManyRequestsException(message),
+ ServerError.TopicNotFound => new TopicNotFoundException(message),
+ ServerError.TopicTerminatedError => new TopicTerminatedException(message),
+ ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
+ ServerError.UnknownError => new UnknownException($"{message}. Error code: {error}"),
+ _ => new UnknownException($"{message}. Error code: {error}")
+ });
public static BaseCommand AsBaseCommand(this CommandAck command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Ack,
Ack = command
};
- }
public static BaseCommand AsBaseCommand(this CommandConnect command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Connect,
Connect = command
};
- }
public static BaseCommand AsBaseCommand(this CommandPing command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Ping,
Ping = command
};
- }
public static BaseCommand AsBaseCommand(this CommandPong command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Pong,
Pong = command
};
- }
public static BaseCommand AsBaseCommand(this CommandProducer command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Producer,
Producer = command
};
- }
public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.GetLastMessageId,
GetLastMessageId = command
};
- }
public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Unsubscribe,
Unsubscribe = command
};
- }
public static BaseCommand AsBaseCommand(this CommandSubscribe command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Subscribe,
Subscribe = command
};
- }
public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Lookup,
LookupTopic = command
};
- }
public static BaseCommand AsBaseCommand(this CommandSend command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Send,
Send = command
};
- }
public static BaseCommand AsBaseCommand(this CommandFlow command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Flow,
Flow = command
};
- }
public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.CloseProducer,
CloseProducer = command
};
- }
public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.CloseConsumer,
CloseConsumer = command
};
- }
public static BaseCommand AsBaseCommand(this CommandSeek command)
- {
- return new BaseCommand
+ => new BaseCommand
{
CommandType = BaseCommand.Type.Seek,
Seek = command
};
- }
}
}
diff --git a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
index 31aa5bf..e653650 100644
--- a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System;
-using Metadata = DotPulsar.Internal.PulsarApi.MessageMetadata;
-
namespace DotPulsar.Internal.Extensions
{
+ using System;
+ using Metadata = PulsarApi.MessageMetadata;
+
public static class MessageMetadataExtensions
{
public static DateTimeOffset GetDeliverAtTimeAsDateTimeOffset(this Metadata metadata)
@@ -26,10 +26,10 @@
=> metadata.DeliverAtTime = timestamp.ToUnixTimeMilliseconds();
public static DateTimeOffset GetEventTimeAsDateTimeOffset(this Metadata metadata)
- => DateTimeOffset.FromUnixTimeMilliseconds((long)metadata.EventTime);
+ => DateTimeOffset.FromUnixTimeMilliseconds((long) metadata.EventTime);
public static void SetEventTime(this Metadata metadata, DateTimeOffset timestamp)
- => metadata.EventTime = (ulong)timestamp.ToUnixTimeMilliseconds();
+ => metadata.EventTime = (ulong) timestamp.ToUnixTimeMilliseconds();
public static byte[]? GetKeyAsBytes(this Metadata metadata)
=> metadata.PartitionKeyB64Encoded ? Convert.FromBase64String(metadata.PartitionKey) : null;
diff --git a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
index 5a94334..2c53da0 100644
--- a/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
@@ -12,17 +12,18 @@
* limitations under the License.
*/
-using System.Buffers;
-
namespace DotPulsar.Internal.Extensions
{
+ using System.Buffers;
+ using PulsarApi;
+
public static class MessagePackageExtensions
{
public static uint GetMetadataSize(this MessagePackage package)
=> package.Data.ReadUInt32(Constants.MetadataSizeOffset, true);
- public static PulsarApi.MessageMetadata ExtractMetadata(this MessagePackage package, uint metadataSize)
- => Serializer.Deserialize<PulsarApi.MessageMetadata>(package.Data.Slice(Constants.MetadataOffset, metadataSize));
+ public static MessageMetadata ExtractMetadata(this MessagePackage package, uint metadataSize)
+ => Serializer.Deserialize<MessageMetadata>(package.Data.Slice(Constants.MetadataOffset, metadataSize));
public static ReadOnlySequence<byte> ExtractData(this MessagePackage package, uint metadataSize)
=> package.Data.Slice(Constants.MetadataOffset + metadataSize);
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index e530e46..5312166 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using System;
-using System.Buffers;
-
namespace DotPulsar.Internal.Extensions
{
+ using System;
+ using System.Buffers;
+
public static class ReadOnlySequenceExtensions
{
public static bool StartsWith<T>(this ReadOnlySequence<T> sequence, ReadOnlyMemory<T> target) where T : IEquatable<T>
@@ -30,12 +30,14 @@
foreach (var memory in sequence)
{
var span = memory.Span;
+
for (var i = 0; i < span.Length; ++i)
{
if (!span[i].Equals(targetSpan[targetIndex]))
return false;
++targetIndex;
+
if (targetIndex == targetSpan.Length)
return true;
}
@@ -46,7 +48,7 @@
public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, long start, bool isBigEndian)
{
- if (sequence.Length < (4 + start))
+ if (sequence.Length < 4 + start)
throw new ArgumentOutOfRangeException(nameof(start), start, "Sequence must be at least 4 bytes long from 'start' to end");
var reverse = isBigEndian != BitConverter.IsLittleEndian;
@@ -62,31 +64,41 @@
}
var span = memory.Span;
- for (var i = (int)start; i < span.Length; ++i, ++read)
+
+ for (var i = (int) start; i < span.Length; ++i, ++read)
{
switch (read)
{
case 0:
- if (reverse) union.B0 = span[i];
- else union.B3 = span[i];
+ if (reverse)
+ union.B0 = span[i];
+ else
+ union.B3 = span[i];
continue;
case 1:
- if (reverse) union.B1 = span[i];
- else union.B2 = span[i];
+ if (reverse)
+ union.B1 = span[i];
+ else
+ union.B2 = span[i];
continue;
case 2:
- if (reverse) union.B2 = span[i];
- else union.B1 = span[i];
+ if (reverse)
+ union.B2 = span[i];
+ else
+ union.B1 = span[i];
continue;
case 3:
- if (reverse) union.B3 = span[i];
- else union.B0 = span[i];
+ if (reverse)
+ union.B3 = span[i];
+ else
+ union.B0 = span[i];
break;
}
}
if (read == 3)
break;
+
start = 0;
}
diff --git a/src/DotPulsar/Internal/FuncExceptionHandler.cs b/src/DotPulsar/Internal/FuncExceptionHandler.cs
index 6e22d97..122e727 100644
--- a/src/DotPulsar/Internal/FuncExceptionHandler.cs
+++ b/src/DotPulsar/Internal/FuncExceptionHandler.cs
@@ -1,15 +1,17 @@
-using DotPulsar.Abstractions;
-using System;
-using System.Threading.Tasks;
-
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading.Tasks;
+ using DotPulsar.Abstractions;
+
public sealed class FuncExceptionHandler : IHandleException
{
private readonly Func<ExceptionContext, ValueTask> _exceptionHandler;
- public FuncExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler) => _exceptionHandler = exceptionHandler;
+ public FuncExceptionHandler(Func<ExceptionContext, ValueTask> exceptionHandler)
+ => _exceptionHandler = exceptionHandler;
- public ValueTask OnException(ExceptionContext exceptionContext) => _exceptionHandler(exceptionContext);
+ public ValueTask OnException(ExceptionContext exceptionContext)
+ => _exceptionHandler(exceptionContext);
}
}
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
index 60c010a..85c7ee0 100644
--- a/src/DotPulsar/Internal/IdLookup.cs
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -12,15 +12,16 @@
* limitations under the License.
*/
-using System.Collections.Generic;
-
namespace DotPulsar.Internal
{
+ using System.Collections.Generic;
+
public sealed class IdLookup<T> where T : class
{
private T?[] _items;
- public IdLookup() => _items = new T[1];
+ public IdLookup()
+ => _items = new T[1];
public bool IsEmpty()
{
@@ -46,7 +47,7 @@
continue;
_items[i] = item;
- return (ulong)i;
+ return (ulong) i;
}
var newArray = new T[_items.Length + 1];
@@ -54,7 +55,7 @@
var id = newArray.Length - 1;
newArray[id] = item;
_items = newArray;
- return (ulong)id;
+ return (ulong) id;
}
}
@@ -62,8 +63,8 @@
{
lock (_items)
{
- var item = _items[(int)id];
- _items[(int)id] = null;
+ var item = _items[(int) id];
+ _items[(int) id] = null;
return item;
}
}
@@ -73,15 +74,18 @@
lock (_items)
{
var items = new List<T>();
+
for (var i = 0; i < _items.Length; ++i)
{
var item = _items[i];
+
if (item != null)
{
items.Add(item);
_items[i] = null;
}
}
+
return items.ToArray();
}
}
@@ -92,7 +96,7 @@
{
lock (_items)
{
- return _items[(int)id];
+ return _items[(int) id];
}
}
}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index a5da955..d6e8799 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Internal.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar.Abstractions;
+ using Extensions;
+
public sealed class MessageBuilder : IMessageBuilder
{
private readonly IProducer _producer;
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
index 5e0803e..c2d14d7 100644
--- a/src/DotPulsar/Internal/MessagePackage.cs
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System.Buffers;
-
namespace DotPulsar.Internal
{
+ using System.Buffers;
+ using PulsarApi;
+
public struct MessagePackage
{
public MessagePackage(MessageIdData messageId, ReadOnlySequence<byte> data)
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index ca958b7..0c4b728 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -12,35 +12,43 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Exceptions;
+ using PulsarApi;
+
public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel, IReaderChannel
{
- public ValueTask DisposeAsync() => new ValueTask();
+ public ValueTask DisposeAsync()
+ => new ValueTask();
- public ValueTask<Message> Receive(CancellationToken cancellationToken = default) => throw GetException();
+ public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
+ => throw GetException();
- public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken) => throw GetException();
+ public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ => throw GetException();
- public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) =>
- throw GetException();
+ public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ => throw GetException();
- public Task Send(CommandAck command, CancellationToken cancellationToken) => throw GetException();
+ public Task Send(CommandAck command, CancellationToken cancellationToken)
+ => throw GetException();
- public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken) => throw GetException();
+ public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
+ => throw GetException();
- public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken) => throw GetException();
+ public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
+ => throw GetException();
- public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) => throw GetException();
+ public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ => throw GetException();
- private Exception GetException() => new ChannelNotReadyException();
+ private Exception GetException()
+ => new ChannelNotReadyException();
}
}
diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs
index 0e8c7df..c6eb088 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Linq;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Events;
+
public sealed class ProcessManager : IRegisterEvent, IAsyncDisposable
{
private readonly ConcurrentDictionary<Guid, IProcess> _processes;
@@ -42,11 +42,12 @@
await _connectionPool.DisposeAsync().ConfigureAwait(false);
}
- public void Add(IProcess process) => _processes[process.CorrelationId] = process;
+ public void Add(IProcess process)
+ => _processes[process.CorrelationId] = process;
private async void Remove(Guid correlationId)
{
- if (_processes.TryRemove(correlationId, out IProcess process))
+ if (_processes.TryRemove(correlationId, out var process))
await process.DisposeAsync().ConfigureAwait(false);
}
@@ -76,10 +77,12 @@
DotPulsarEventSource.Log.ReaderDisposed();
break;
default:
- if (_processes.TryGetValue(e.CorrelationId, out IProcess process))
+ if (_processes.TryGetValue(e.CorrelationId, out var process))
process.Handle(e);
break;
- };
+ }
+
+ ;
}
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 2f94e13..e993e85 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -12,17 +12,17 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using Events;
+
public sealed class Producer : IProducer
{
private readonly Guid _correlationId;
@@ -55,9 +55,11 @@
public async ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
=> await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
- public bool IsFinalState() => _state.IsFinalState();
+ public bool IsFinalState()
+ => _state.IsFinalState();
- public bool IsFinalState(ProducerState state) => _state.IsFinalState(state);
+ public bool IsFinalState(ProducerState state)
+ => _state.IsFinalState(state);
public async ValueTask DisposeAsync()
{
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 09f1c7a..72a7c4f 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal
{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+
public sealed class ProducerBuilder : IProducerBuilder
{
private readonly IPulsarClient _pulsarClient;
@@ -53,11 +53,7 @@
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("ProducerOptions.Topic may not be null or empty");
- var options = new ProducerOptions(_topic!)
- {
- InitialSequenceId = _initialSequenceId,
- ProducerName = _producerName
- };
+ var options = new ProducerOptions(_topic!) { InitialSequenceId = _initialSequenceId, ProducerName = _producerName };
return _pulsarClient.CreateProducer(options);
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 0a9dff9..e281154 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -12,19 +12,19 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Extensions;
+ using PulsarApi;
+
public sealed class ProducerChannel : IProducerChannel
{
- private readonly PulsarApi.MessageMetadata _cachedMetadata;
+ private readonly MessageMetadata _cachedMetadata;
private readonly SendPackage _cachedSendPackage;
private readonly ulong _id;
private readonly SequenceId _sequenceId;
@@ -32,16 +32,9 @@
public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
{
- _cachedMetadata = new PulsarApi.MessageMetadata
- {
- ProducerName = name
- };
+ _cachedMetadata = new MessageMetadata { ProducerName = name };
- var commandSend = new CommandSend
- {
- ProducerId = id,
- NumMessages = 1
- };
+ var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 };
_cachedSendPackage = new SendPackage(commandSend, _cachedMetadata);
@@ -69,7 +62,7 @@
return await SendPackage(true, cancellationToken).ConfigureAwait(false);
}
- public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ public async Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
metadata.ProducerName = _cachedMetadata.ProducerName;
_cachedSendPackage.Metadata = metadata;
@@ -81,7 +74,7 @@
{
try
{
- _cachedSendPackage.Metadata.PublishTime = (ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ _cachedSendPackage.Metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (autoAssignSequenceId)
{
@@ -89,7 +82,9 @@
_cachedSendPackage.Metadata.SequenceId = _sequenceId.Current;
}
else
+ {
_cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
+ }
var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
@@ -102,7 +97,8 @@
finally
{
if (autoAssignSequenceId)
- _cachedSendPackage.Metadata.SequenceId = 0; // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
+ _cachedSendPackage.Metadata.SequenceId =
+ 0; // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
}
}
}
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index ae8c15e..4cbd01a 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using PulsarApi;
+
public sealed class ProducerChannelFactory : IProducerChannelFactory
{
private readonly Guid _correlationId;
@@ -42,11 +42,7 @@
_executor = executor;
_sequenceId = new SequenceId(options.InitialSequenceId);
- _commandProducer = new CommandProducer
- {
- ProducerName = options.ProducerName,
- Topic = options.Topic
- };
+ _commandProducer = new CommandProducer { ProducerName = options.ProducerName, Topic = options.Topic };
}
public async Task<IProducerChannel> Create(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index a4e7d52..ef39c9b 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading.Tasks;
+ using Abstractions;
+
public sealed class ProducerProcess : Process
{
private readonly IStateManager<ProducerState> _stateManager;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index dc22d89..fd6870a 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -12,37 +12,33 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Collections.Generic;
-using System.Security.Cryptography.X509Certificates;
-using System.Text;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Collections.Generic;
+ using System.Security.Cryptography.X509Certificates;
+ using System.Text;
+ using System.Threading.Tasks;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using PulsarApi;
+
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
private readonly CommandConnect _commandConnect;
- private List<IHandleException> _exceptionHandlers;
+ private readonly List<IHandleException> _exceptionHandlers;
private EncryptionPolicy? _encryptionPolicy;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
private X509Certificate2? _trustedCertificateAuthority;
- private X509Certificate2Collection _clientCertificates;
+ private readonly X509Certificate2Collection _clientCertificates;
private bool _verifyCertificateAuthority;
private bool _verifyCertificateName;
private TimeSpan _closeInactiveConnectionsInterval;
public PulsarClientBuilder()
{
- _commandConnect = new CommandConnect
- {
- ProtocolVersion = Constants.ProtocolVersion,
- ClientVersion = Constants.ClientVersion
- };
+ _commandConnect = new CommandConnect { ProtocolVersion = Constants.ProtocolVersion, ClientVersion = Constants.ClientVersion };
_exceptionHandlers = new List<IHandleException>();
_retryInterval = TimeSpan.FromSeconds(3);
@@ -131,7 +127,8 @@
_encryptionPolicy = EncryptionPolicy.EnforceUnencrypted;
if (_encryptionPolicy.Value == EncryptionPolicy.EnforceEncrypted)
- throw new ConnectionSecurityException($"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
+ throw new ConnectionSecurityException(
+ $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarScheme}' and cannot be used with an encryption policy of 'EnforceEncrypted'");
}
else if (scheme == Constants.PulsarSslScheme)
{
@@ -139,17 +136,17 @@
_encryptionPolicy = EncryptionPolicy.EnforceEncrypted;
if (_encryptionPolicy.Value == EncryptionPolicy.EnforceUnencrypted)
- throw new ConnectionSecurityException($"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
+ throw new ConnectionSecurityException(
+ $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'");
}
else
+ {
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
+ }
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval);
- var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
- {
- new DefaultExceptionHandler(_retryInterval)
- };
+ var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
var processManager = new ProcessManager(connectionPool);
return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 6c7b330..d4c3f97 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -12,20 +12,20 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Exceptions;
-using DotPulsar.Internal.Extensions;
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.IO;
-using System.IO.Pipelines;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.IO.Pipelines;
+ using System.Runtime.CompilerServices;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Exceptions;
+ using Extensions;
+
public sealed class PulsarStream : IPulsarStream
{
private const long PauseAtMoreThan10Mb = 10485760;
@@ -101,6 +101,7 @@
_writer.Advance(bytesRead);
var result = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
+
if (result.IsCompleted)
break;
}
@@ -132,6 +133,7 @@
var frameSize = buffer.ReadUInt32(0, true);
var totalSize = frameSize + 4;
+
if (buffer.Length < totalSize)
break;
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 6c7d1c5..d559663 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -12,18 +12,18 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using System;
-using System.Collections.Generic;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Collections.Generic;
+ using System.Runtime.CompilerServices;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+ using Events;
+
public sealed class Reader : IReader
{
private readonly Guid _correlationId;
@@ -56,18 +56,18 @@
public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
=> await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
- public bool IsFinalState() => _state.IsFinalState();
+ public bool IsFinalState()
+ => _state.IsFinalState();
- public bool IsFinalState(ReaderState state) => _state.IsFinalState(state);
+ public bool IsFinalState(ReaderState state)
+ => _state.IsFinalState(state);
public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
{
ThrowIfDisposed();
while (!cancellationToken.IsCancellationRequested)
- {
yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
- }
}
public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index b04747b..16d99ab 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-
namespace DotPulsar.Internal
{
+ using DotPulsar.Abstractions;
+ using DotPulsar.Exceptions;
+
public sealed class ReaderBuilder : IReaderBuilder
{
private readonly IPulsarClient _pulsarClient;
@@ -71,12 +71,7 @@
if (string.IsNullOrEmpty(_topic))
throw new ConfigurationException("Topic may not be null or empty");
- var options = new ReaderOptions(_startMessageId, _topic!)
- {
- MessagePrefetchCount = _messagePrefetchCount,
- ReadCompacted = _readCompacted,
- ReaderName = _readerName
- };
+ var options = new ReaderOptions(_startMessageId, _topic!) { MessagePrefetchCount = _messagePrefetchCount, ReadCompacted = _readCompacted, ReaderName = _readerName };
return _pulsarClient.CreateReader(options);
}
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index d37319f..4458b7a 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using PulsarApi;
+
public sealed class ReaderChannelFactory : IReaderChannelFactory
{
private readonly Guid _correlationId;
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index 3a7ec7d..845bf54 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading.Tasks;
+ using Abstractions;
+
public sealed class ReaderProcess : Process
{
private readonly IStateManager<ReaderState> _stateManager;
@@ -35,7 +35,7 @@
_reader = reader;
}
- public async override ValueTask DisposeAsync()
+ public override async ValueTask DisposeAsync()
{
_stateManager.SetState(ReaderState.Closed);
CancellationTokenSource.Cancel();
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index 6df4ceb..79c8533 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Threading.Tasks;
+ using PulsarApi;
+
public sealed class RequestResponseHandler : IDisposable
{
private const string ConnectResponseIdentifier = "Connected";
@@ -31,7 +31,8 @@
_requestId = 1;
}
- public void Dispose() => _responses.Dispose();
+ public void Dispose()
+ => _responses.Dispose();
public Task<BaseCommand> Outgoing(BaseCommand command)
{
@@ -42,6 +43,7 @@
public void Incoming(BaseCommand command)
{
var identifier = GetResponseIdentifier(command);
+
if (identifier != null)
_responses.SetResult(identifier, command);
}
@@ -85,24 +87,42 @@
switch (cmd.CommandType)
{
case BaseCommand.Type.Connect:
- case BaseCommand.Type.Connected: return ConnectResponseIdentifier;
- case BaseCommand.Type.Send: return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId.ToString();
- case BaseCommand.Type.SendError: return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId.ToString();
- case BaseCommand.Type.SendReceipt: return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId.ToString();
- case BaseCommand.Type.Error: return _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString();
- case BaseCommand.Type.Producer: return cmd.Producer.RequestId.ToString();
- case BaseCommand.Type.ProducerSuccess: return cmd.ProducerSuccess.RequestId.ToString();
- case BaseCommand.Type.CloseProducer: return cmd.CloseProducer.RequestId.ToString();
- case BaseCommand.Type.Lookup: return cmd.LookupTopic.RequestId.ToString();
- case BaseCommand.Type.LookupResponse: return cmd.LookupTopicResponse.RequestId.ToString();
- case BaseCommand.Type.Unsubscribe: return cmd.Unsubscribe.RequestId.ToString();
- case BaseCommand.Type.Subscribe: return cmd.Subscribe.RequestId.ToString();
- case BaseCommand.Type.Success: return cmd.Success.RequestId.ToString();
- case BaseCommand.Type.Seek: return cmd.Seek.RequestId.ToString();
- case BaseCommand.Type.CloseConsumer: return cmd.CloseConsumer.RequestId.ToString();
- case BaseCommand.Type.GetLastMessageId: return cmd.GetLastMessageId.RequestId.ToString();
- case BaseCommand.Type.GetLastMessageIdResponse: return cmd.GetLastMessageIdResponse.RequestId.ToString();
- default: throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
+ case BaseCommand.Type.Connected:
+ return ConnectResponseIdentifier;
+ case BaseCommand.Type.Send:
+ return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId;
+ case BaseCommand.Type.SendError:
+ return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId;
+ case BaseCommand.Type.SendReceipt:
+ return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId;
+ case BaseCommand.Type.Error:
+ return _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString();
+ case BaseCommand.Type.Producer:
+ return cmd.Producer.RequestId.ToString();
+ case BaseCommand.Type.ProducerSuccess:
+ return cmd.ProducerSuccess.RequestId.ToString();
+ case BaseCommand.Type.CloseProducer:
+ return cmd.CloseProducer.RequestId.ToString();
+ case BaseCommand.Type.Lookup:
+ return cmd.LookupTopic.RequestId.ToString();
+ case BaseCommand.Type.LookupResponse:
+ return cmd.LookupTopicResponse.RequestId.ToString();
+ case BaseCommand.Type.Unsubscribe:
+ return cmd.Unsubscribe.RequestId.ToString();
+ case BaseCommand.Type.Subscribe:
+ return cmd.Subscribe.RequestId.ToString();
+ case BaseCommand.Type.Success:
+ return cmd.Success.RequestId.ToString();
+ case BaseCommand.Type.Seek:
+ return cmd.Seek.RequestId.ToString();
+ case BaseCommand.Type.CloseConsumer:
+ return cmd.CloseConsumer.RequestId.ToString();
+ case BaseCommand.Type.GetLastMessageId:
+ return cmd.GetLastMessageId.RequestId.ToString();
+ case BaseCommand.Type.GetLastMessageIdResponse:
+ return cmd.GetLastMessageIdResponse.RequestId.ToString();
+ default:
+ throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type");
}
}
}
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
index ba71078..2acc07e 100644
--- a/src/DotPulsar/Internal/SendPackage.cs
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -12,21 +12,21 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System.Buffers;
-
namespace DotPulsar.Internal
{
+ using System.Buffers;
+ using PulsarApi;
+
public sealed class SendPackage
{
- public SendPackage(CommandSend command, PulsarApi.MessageMetadata metadata)
+ public SendPackage(CommandSend command, MessageMetadata metadata)
{
Command = command;
Metadata = metadata;
}
public CommandSend Command { get; }
- public PulsarApi.MessageMetadata Metadata { get; set; }
+ public MessageMetadata Metadata { get; set; }
public ReadOnlySequence<byte> Payload { get; set; }
}
}
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
index 7c231e8..4ad0a41 100644
--- a/src/DotPulsar/Internal/SequenceBuilder.cs
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -12,18 +12,19 @@
* limitations under the License.
*/
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Linq;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+ using System.Linq;
+
public sealed class SequenceBuilder<T> where T : notnull
{
private readonly LinkedList<ReadOnlyMemory<T>> _elements;
- public SequenceBuilder() => _elements = new LinkedList<ReadOnlyMemory<T>>();
+ public SequenceBuilder()
+ => _elements = new LinkedList<ReadOnlyMemory<T>>();
public SequenceBuilder<T> Prepend(ReadOnlyMemory<T> memory)
{
@@ -37,10 +38,9 @@
foreach (var memory in sequence)
{
- if (index is null)
- index = _elements.AddFirst(memory);
- else
- index = _elements.AddAfter(index, memory);
+ index = index is null
+ ? _elements.AddFirst(memory)
+ : _elements.AddAfter(index, memory);
}
return this;
@@ -78,7 +78,9 @@
start = current;
}
else
+ {
current = current.CreateNext(element);
+ }
}
return new ReadOnlySequence<T>(start, 0, current, current!.Memory.Length);
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 4499dbf..5b41d7d 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -19,12 +19,14 @@
public SequenceId(ulong initialSequenceId)
{
Current = initialSequenceId;
+
if (initialSequenceId > 0)
Increment();
}
public ulong Current { get; private set; }
- public void Increment() => ++Current;
+ public void Increment()
+ => ++Current;
}
}
diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs
index 093e6ce..ca5f108 100644
--- a/src/DotPulsar/Internal/Serializer.cs
+++ b/src/DotPulsar/Internal/Serializer.cs
@@ -12,26 +12,27 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System;
-using System.Buffers;
-using System.IO;
-
namespace DotPulsar.Internal
{
+ using System;
+ using System.Buffers;
+ using System.IO;
+ using PulsarApi;
+
public static class Serializer
{
public static T Deserialize<T>(ReadOnlySequence<byte> sequence)
{
- using var ms = new MemoryStream(sequence.ToArray()); //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
+ //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
+ using var ms = new MemoryStream(sequence.ToArray());
return ProtoBuf.Serializer.Deserialize<T>(ms);
}
public static ReadOnlySequence<byte> Serialize(BaseCommand command)
{
var commandBytes = Serialize<BaseCommand>(command);
- var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
- var totalSizeBytes = ToBigEndianBytes((uint)commandBytes.Length + 4);
+ var commandSizeBytes = ToBigEndianBytes((uint) commandBytes.Length);
+ var totalSizeBytes = ToBigEndianBytes((uint) commandBytes.Length + 4);
return new SequenceBuilder<byte>()
.Append(totalSizeBytes)
@@ -40,13 +41,13 @@
.Build();
}
- public static ReadOnlySequence<byte> Serialize(BaseCommand command, PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload)
+ public static ReadOnlySequence<byte> Serialize(BaseCommand command, MessageMetadata metadata, ReadOnlySequence<byte> payload)
{
var commandBytes = Serialize<BaseCommand>(command);
- var commandSizeBytes = ToBigEndianBytes((uint)commandBytes.Length);
+ var commandSizeBytes = ToBigEndianBytes((uint) commandBytes.Length);
var metadataBytes = Serialize(metadata);
- var metadataSizeBytes = ToBigEndianBytes((uint)metadataBytes.Length);
+ var metadataSizeBytes = ToBigEndianBytes((uint) metadataBytes.Length);
var sb = new SequenceBuilder<byte>().Append(metadataSizeBytes).Append(metadataBytes).Append(payload);
var checksum = Crc32C.Calculate(sb.Build());
@@ -55,17 +56,17 @@
.Prepend(Constants.MagicNumber)
.Prepend(commandBytes)
.Prepend(commandSizeBytes)
- .Prepend(ToBigEndianBytes((uint)sb.Length))
+ .Prepend(ToBigEndianBytes((uint) sb.Length))
.Build();
}
public static byte[] ToBigEndianBytes(uint integer)
{
var union = new UIntUnion(integer);
- if (BitConverter.IsLittleEndian)
- return new[] { union.B3, union.B2, union.B1, union.B0 };
- else
- return new[] { union.B0, union.B1, union.B2, union.B3 };
+
+ return BitConverter.IsLittleEndian
+ ? new[] { union.B3, union.B2, union.B1, union.B0 }
+ : new[] { union.B0, union.B1, union.B2, union.B3 };
}
private static byte[] Serialize<T>(T item)
diff --git a/src/DotPulsar/Internal/StateManager.cs b/src/DotPulsar/Internal/StateManager.cs
index e2f1c0b..f9fab5d 100644
--- a/src/DotPulsar/Internal/StateManager.cs
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Abstractions;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+
public sealed class StateManager<TState> : IStateManager<TState> where TState : notnull
{
private readonly object _lock;
@@ -84,6 +84,7 @@
return false;
}
- public bool IsFinalState() => IsFinalState(CurrentState);
+ public bool IsFinalState()
+ => IsFinalState(CurrentState);
}
}
diff --git a/src/DotPulsar/Internal/StateTask.cs b/src/DotPulsar/Internal/StateTask.cs
index 452276d..dbd5a08 100644
--- a/src/DotPulsar/Internal/StateTask.cs
+++ b/src/DotPulsar/Internal/StateTask.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System;
-
namespace DotPulsar.Internal
{
+ using System;
+
public sealed class StateTask<TState> : IDisposable where TState : notnull
{
private readonly TState _state;
@@ -30,7 +30,8 @@
public CancelableCompletionSource<TState> CancelableCompletionSource { get; }
- public void Dispose() => CancelableCompletionSource.Dispose();
+ public void Dispose()
+ => CancelableCompletionSource.Dispose();
public bool IsAwaiting(TState state)
{
diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs
index 418c5e6..d8d2547 100644
--- a/src/DotPulsar/Internal/StateTaskCollection.cs
+++ b/src/DotPulsar/Internal/StateTaskCollection.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar.Internal
{
+ using System.Collections.Generic;
+ using System.Threading;
+ using System.Threading.Tasks;
+
public sealed class StateTaskCollection<TState> where TState : notnull
{
private readonly object _lock;
@@ -47,15 +47,18 @@
lock (_lock)
{
var awaitor = _awaitors.First;
+
while (awaitor != null)
{
var next = awaitor.Next;
+
if (awaitor.Value.IsAwaiting(state))
{
_awaitors.Remove(awaitor);
awaitor.Value.CancelableCompletionSource.SetResult(state);
awaitor.Value.CancelableCompletionSource.Dispose();
}
+
awaitor = next;
}
}
@@ -70,6 +73,7 @@
awaitor.CancelableCompletionSource.SetResult(state);
awaitor.CancelableCompletionSource.Dispose();
}
+
_awaitors.Clear();
}
}
diff --git a/src/DotPulsar/Internal/SubscribeResponse.cs b/src/DotPulsar/Internal/SubscribeResponse.cs
index db135d1..472fddd 100644
--- a/src/DotPulsar/Internal/SubscribeResponse.cs
+++ b/src/DotPulsar/Internal/SubscribeResponse.cs
@@ -16,7 +16,8 @@
{
public sealed class SubscribeResponse
{
- public SubscribeResponse(ulong consumerId) => ConsumerId = consumerId;
+ public SubscribeResponse(ulong consumerId)
+ => ConsumerId = consumerId;
public ulong ConsumerId { get; }
}
diff --git a/src/DotPulsar/Internal/UIntUnion.cs b/src/DotPulsar/Internal/UIntUnion.cs
index 21dc4e4..96b3a10 100644
--- a/src/DotPulsar/Internal/UIntUnion.cs
+++ b/src/DotPulsar/Internal/UIntUnion.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System.Runtime.InteropServices;
-
namespace DotPulsar.Internal
{
+ using System.Runtime.InteropServices;
+
[StructLayout(LayoutKind.Explicit)]
public struct UIntUnion
{
@@ -39,10 +39,13 @@
[FieldOffset(0)]
public byte B0;
+
[FieldOffset(1)]
public byte B1;
+
[FieldOffset(2)]
public byte B2;
+
[FieldOffset(3)]
public byte B3;
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 53d0693..2c0adec 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -12,22 +12,23 @@
* limitations under the License.
*/
-using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Linq;
-
namespace DotPulsar
{
+ using System;
+ using System.Buffers;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Internal.PulsarApi;
+
public sealed class Message
{
- private readonly List<Internal.PulsarApi.KeyValue> _keyValues;
+ private readonly List<KeyValue> _keyValues;
private IReadOnlyDictionary<string, string>? _properties;
internal Message(
MessageId messageId,
Internal.PulsarApi.MessageMetadata metadata,
- Internal.PulsarApi.SingleMessageMetadata? singleMetadata,
+ SingleMessageMetadata? singleMetadata,
ReadOnlySequence<byte> data)
{
MessageId = messageId;
@@ -62,7 +63,7 @@
public bool HasEventTime => EventTime != 0;
public ulong EventTime { get; }
- public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)EventTime);
+ public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) EventTime);
public bool HasBase64EncodedKey { get; }
public bool HasKey => Key != null;
@@ -72,9 +73,8 @@
public bool HasOrderingKey => OrderingKey != null;
public byte[]? OrderingKey { get; }
-
public ulong PublishTime { get; }
- public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)PublishTime);
+ public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long) PublishTime);
public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyValues.ToDictionary(p => p.Key, p => p.Value);
}
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index e2c26c1..9e90967 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal.PulsarApi;
-using System;
-
namespace DotPulsar
{
+ using System;
+ using Internal.PulsarApi;
+
public sealed class MessageId : IEquatable<MessageId>
{
static MessageId()
@@ -28,18 +28,11 @@
public static MessageId Earliest { get; }
public static MessageId Latest { get; }
- internal MessageId(MessageIdData messageIdData) => Data = messageIdData;
+ internal MessageId(MessageIdData messageIdData)
+ => Data = messageIdData;
public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
- {
- Data = new MessageIdData
- {
- LedgerId = ledgerId,
- EntryId = entryId,
- Partition = partition,
- BatchIndex = batchIndex
- };
- }
+ => Data = new MessageIdData { LedgerId = ledgerId, EntryId = entryId, Partition = partition, BatchIndex = batchIndex };
internal MessageIdData Data { get; }
@@ -48,13 +41,22 @@
public int Partition => Data.Partition;
public int BatchIndex => Data.BatchIndex;
- public override bool Equals(object o) => o is MessageId ? Equals((MessageId)o) : false;
- public bool Equals(MessageId other) => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
+ public override bool Equals(object o)
+ => o is MessageId ? Equals((MessageId) o) : false;
- public static bool operator ==(MessageId x, MessageId y) => x.Equals(y);
- public static bool operator !=(MessageId x, MessageId y) => !x.Equals(y);
+ public bool Equals(MessageId other)
+ => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
- public override int GetHashCode() => HashCode.Combine(LedgerId, EntryId, Partition, BatchIndex);
- public override string ToString() => $"LedgerId: {LedgerId}, EntryId: {EntryId}, Partition: {Partition}, BatchIndex: {BatchIndex}";
+ public static bool operator ==(MessageId x, MessageId y)
+ => x.Equals(y);
+
+ public static bool operator !=(MessageId x, MessageId y)
+ => !x.Equals(y);
+
+ public override int GetHashCode()
+ => HashCode.Combine(LedgerId, EntryId, Partition, BatchIndex);
+
+ public override string ToString()
+ => $"LedgerId: {LedgerId}, EntryId: {EntryId}, Partition: {Partition}, BatchIndex: {BatchIndex}";
}
}
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index 98cd366..2af466e 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -12,14 +12,16 @@
* limitations under the License.
*/
-using DotPulsar.Internal.Extensions;
-using System;
-
namespace DotPulsar
{
+ using System;
+ using Internal.Extensions;
+ using Internal.PulsarApi;
+
public sealed class MessageMetadata
{
- public MessageMetadata() => Metadata = new Internal.PulsarApi.MessageMetadata();
+ public MessageMetadata()
+ => Metadata = new Internal.PulsarApi.MessageMetadata();
internal Internal.PulsarApi.MessageMetadata Metadata;
@@ -72,6 +74,7 @@
for (var i = 0; i < Metadata.Properties.Count; ++i)
{
var keyValye = Metadata.Properties[i];
+
if (keyValye.Key == key)
return keyValye.Value;
}
@@ -83,13 +86,16 @@
for (var i = 0; i < Metadata.Properties.Count; ++i)
{
var keyValye = Metadata.Properties[i];
+
if (keyValye.Key != key)
continue;
+
keyValye.Value = value;
+
return;
}
- Metadata.Properties.Add(new Internal.PulsarApi.KeyValue { Key = key, Value = value });
+ Metadata.Properties.Add(new KeyValue { Key = key, Value = value });
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index ad5f0e5..46dcfa9 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -12,16 +12,16 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal;
-using DotPulsar.Internal.Abstractions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
namespace DotPulsar
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Exceptions;
+ using Internal;
+ using Internal.Abstractions;
+
public sealed class PulsarClient : IPulsarClient
{
private readonly IConnectionPool _connectionPool;
@@ -38,7 +38,8 @@
DotPulsarEventSource.Log.ClientCreated();
}
- public static IPulsarClientBuilder Builder() => new PulsarClientBuilder();
+ public static IPulsarClientBuilder Builder()
+ => new PulsarClientBuilder();
public IProducer CreateProducer(ProducerOptions options)
{
@@ -60,7 +61,9 @@
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
- var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+
+ var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic,
+ ConsumerState.Faulted);
var consumer = new Consumer(correlationId, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
diff --git a/tests/DotPulsar.StressTests/ConnectionTests.cs b/tests/DotPulsar.StressTests/ConnectionTests.cs
index 3a5f694..cafc5b6 100644
--- a/tests/DotPulsar.StressTests/ConnectionTests.cs
+++ b/tests/DotPulsar.StressTests/ConnectionTests.cs
@@ -12,22 +12,23 @@
* limitations under the License.
*/
-using DotPulsar.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using DotPulsar.StressTests.Fixtures;
-using Xunit;
-using Xunit.Abstractions;
-
namespace DotPulsar.StressTests
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Extensions;
+ using Fixtures;
+ using Xunit;
+ using Xunit.Abstractions;
+
[Collection(nameof(StandaloneClusterTest))]
public class ConnectionTests
{
private readonly ITestOutputHelper _output;
- public ConnectionTests(ITestOutputHelper output) => _output = output;
+ public ConnectionTests(ITestOutputHelper output)
+ => _output = output;
[Theory]
[InlineData("pulsar://localhost:54545")] // test that we can connect directly to a broker
@@ -43,9 +44,7 @@
.ExceptionHandler(new XunitExceptionHandler(_output));
if (!string.IsNullOrEmpty(serviceUrl))
- {
builder.ServiceUrl(new Uri(serviceUrl));
- }
await using var client = builder.Build();
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index ae84601..db3de6e 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -12,26 +12,27 @@
* limitations under the License.
*/
-using DotPulsar.Extensions;
-using FluentAssertions;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using DotPulsar.StressTests.Fixtures;
-using Xunit;
-using Xunit.Abstractions;
-
namespace DotPulsar.StressTests
{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Text;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Extensions;
+ using Fixtures;
+ using FluentAssertions;
+ using Xunit;
+ using Xunit.Abstractions;
+
[Collection(nameof(StandaloneClusterTest))]
public class ConsumerTests
{
private readonly ITestOutputHelper _output;
- public ConsumerTests(ITestOutputHelper output) => _output = output;
+ public ConsumerTests(ITestOutputHelper output)
+ => _output = output;
[Theory]
[InlineData(10000)]
@@ -83,7 +84,8 @@
{
ids.Add(message.MessageId);
- if (ids.Count != numberOfMessages) continue;
+ if (ids.Count != numberOfMessages)
+ continue;
await consumer.AcknowledgeCumulative(message, ct).ConfigureAwait(false);
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 0558fac..7f6eca1 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -12,13 +12,16 @@
* limitations under the License.
*/
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading.Tasks;
+#pragma warning disable 8601
+#pragma warning disable 8625
namespace DotPulsar.StressTests
{
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading.Tasks;
+
public static class EnumerableValueTaskExtensions
{
[DebuggerStepThrough]
@@ -47,19 +50,19 @@
public static async IAsyncEnumerable<TResult> Enumerate<TResult>(this IEnumerable<ValueTask<TResult>> source)
{
foreach (var operation in source.Select(GetInfo))
+ {
yield return operation.IsTask
? await operation.Task.ConfigureAwait(false)
: operation.Result;
+ }
}
private static ValueTaskInfo<TResult> GetInfo<TResult>(this ValueTask<TResult> source)
- {
- return source.IsCompleted
+ => source.IsCompleted
? new ValueTaskInfo<TResult>(source.Result)
: new ValueTaskInfo<TResult>(source.AsTask());
- }
- private struct ValueTaskInfo<TResult>
+ private readonly struct ValueTaskInfo<TResult>
{
public ValueTaskInfo(Task<TResult> task)
{
@@ -84,9 +87,11 @@
public static class EnumerableTaskExtensions
{
[DebuggerStepThrough]
- public static Task WhenAll(this IEnumerable<Task> source) => Task.WhenAll(source);
+ public static Task WhenAll(this IEnumerable<Task> source)
+ => Task.WhenAll(source);
[DebuggerStepThrough]
- public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> source) => Task.WhenAll(source);
+ public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> source)
+ => Task.WhenAll(source);
}
}
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index 7583fc6..2874bfd 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -12,28 +12,25 @@
* limitations under the License.
*/
-using System;
-using System.Diagnostics;
-using System.Net.Http;
-using System.Threading.Tasks;
-using Xunit;
-
namespace DotPulsar.StressTests.Fixtures
{
+ using System;
+ using System.Diagnostics;
+ using System.Net.Http;
+ using System.Threading.Tasks;
+ using Xunit;
+
public class StandaloneClusterFixture : IAsyncLifetime
{
public async Task InitializeAsync()
{
- TakeDownPulsar(); // clean-up if anything was left running from previous run
+ TakeDownPulsar(); // clean-up if anything was left running from previous run
RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml up -d");
var waitTries = 10;
- using var handler = new HttpClientHandler
- {
- AllowAutoRedirect = true
- };
+ using var handler = new HttpClientHandler { AllowAutoRedirect = true };
using var client = new HttpClient(handler);
@@ -65,11 +62,7 @@
private static void RunProcess(string name, string arguments)
{
- var processStartInfo = new ProcessStartInfo()
- {
- FileName = name,
- Arguments = arguments
- };
+ var processStartInfo = new ProcessStartInfo { FileName = name, Arguments = arguments };
processStartInfo.Environment["TAG"] = "test";
processStartInfo.Environment["CONFIGURATION"] = "Debug";
@@ -80,9 +73,7 @@
process.WaitForExit();
if (process.ExitCode != 0)
- {
throw new Exception($"Exit code {process.ExitCode} when running process {name} with arguments {arguments}");
- }
}
}
}
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs
index e603fb5..cb2de57 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterTests.cs
@@ -12,13 +12,10 @@
* limitations under the License.
*/
-using Xunit;
-
namespace DotPulsar.StressTests.Fixtures
{
- [CollectionDefinition(nameof(StandaloneClusterTest))]
- public class StandaloneClusterTest : ICollectionFixture<StandaloneClusterFixture>
- {
+ using Xunit;
- }
+ [CollectionDefinition(nameof(StandaloneClusterTest))]
+ public class StandaloneClusterTest : ICollectionFixture<StandaloneClusterFixture> { }
}
diff --git a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
index 159781d..e381c95 100644
--- a/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
+++ b/tests/DotPulsar.StressTests/XunitExceptionHandler.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Abstractions;
-using DotPulsar.Internal;
-using System;
-using System.Threading.Tasks;
-using Xunit.Abstractions;
-
namespace DotPulsar.StressTests
{
+ using System;
+ using System.Threading.Tasks;
+ using Abstractions;
+ using Internal;
+ using Xunit.Abstractions;
+
internal class XunitExceptionHandler : IHandleException
{
private readonly ITestOutputHelper _output;
@@ -31,9 +31,7 @@
_exceptionHandler = exceptionHandler;
}
- public XunitExceptionHandler(ITestOutputHelper output) : this(output, new DefaultExceptionHandler(TimeSpan.FromSeconds(3)))
- {
- }
+ public XunitExceptionHandler(ITestOutputHelper output) : this(output, new DefaultExceptionHandler(TimeSpan.FromSeconds(3))) { }
public async ValueTask OnException(ExceptionContext exceptionContext)
{
diff --git a/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs b/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
index adce63f..bb7a151 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -12,15 +12,14 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using DotPulsar.Internal.Exceptions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-
namespace DotPulsar.Tests.Internal
{
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar.Internal;
+ using DotPulsar.Internal.Exceptions;
+ using Xunit;
+
public class AsyncLockTests
{
[Fact]
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index 290932c..d7afda4 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-
namespace DotPulsar.Tests.Internal
{
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar.Internal;
+ using Xunit;
+
public class AsyncQueueTests
{
[Fact]
diff --git a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
index 687b6d4..2c3b27e 100644
--- a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using Xunit;
-
namespace DotPulsar.Tests.Internal
{
+ using DotPulsar.Internal;
+ using Xunit;
+
public class Crc32CTests
{
[Fact]
@@ -38,7 +38,11 @@
public void Calculate_GivenSequenceWithMultipleSegments_ShouldReturnExpectedChecksum()
{
//Arrange
- var s1 = new byte[] { 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01 };
+ var s1 = new byte[]
+ {
+ 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c,
+ 0x58, 0x01
+ };
var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c };
var sequence = new SequenceBuilder<byte>().Append(s1).Append(s2).Build();
diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
index 4d4f900..91f2c7b 100644
--- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using DotPulsar.Internal.Extensions;
-using Xunit;
-
namespace DotPulsar.Tests.Internal.Extensions
{
+ using DotPulsar.Internal;
+ using DotPulsar.Internal.Extensions;
+ using Xunit;
+
public class ReadOnlySequenceExtensionsTests
{
[Fact]
diff --git a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
index ede8b2d..1a2ae51 100644
--- a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs
@@ -12,12 +12,12 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using System.Buffers;
-using Xunit;
-
namespace DotPulsar.Tests.Internal
{
+ using System.Buffers;
+ using DotPulsar.Internal;
+ using Xunit;
+
public class SequenceBuilderTests
{
[Fact]
@@ -34,6 +34,7 @@
//Assert
var array = sequence.ToArray();
+
for (byte i = 0; i < array.Length; ++i)
Assert.Equal(i, array[i]);
}
@@ -55,6 +56,7 @@
//Assert
var array = sequence.ToArray();
+
for (byte i = 0; i < array.Length; ++i)
Assert.Equal(i, array[i]);
}
@@ -73,6 +75,7 @@
//Assert
var array = sequence.ToArray();
+
for (byte i = 0; i < array.Length; ++i)
Assert.Equal(i, array[i]);
}
@@ -94,6 +97,7 @@
//Assert
var array = sequence.ToArray();
+
for (byte i = 0; i < array.Length; ++i)
Assert.Equal(i, array[i]);
}
diff --git a/tests/DotPulsar.Tests/Internal/SerializerTests.cs b/tests/DotPulsar.Tests/Internal/SerializerTests.cs
index 81375a2..87df2b3 100644
--- a/tests/DotPulsar.Tests/Internal/SerializerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/SerializerTests.cs
@@ -12,11 +12,11 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using Xunit;
-
namespace DotPulsar.Tests.Internal
{
+ using DotPulsar.Internal;
+ using Xunit;
+
public class SerializerTests
{
[Fact]
diff --git a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
index 152b55b..9ea4aa9 100644
--- a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -12,13 +12,13 @@
* limitations under the License.
*/
-using DotPulsar.Internal;
-using System.Threading;
-using System.Threading.Tasks;
-using Xunit;
-
namespace DotPulsar.Tests.Internal
{
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DotPulsar.Internal;
+ using Xunit;
+
public class StateManagerTests
{
[Theory]