Fixed all things
diff --git a/docker-compose.yml b/docker-compose.yml
index 4683497..2b05428 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -4,7 +4,7 @@
pulsar:
container_name: pulsar
- image: apachepulsar/pulsar
+ image: 'apachepulsar/pulsar:2.5.0'
ports:
- '8080:8080'
- '6650:6650'
@@ -12,22 +12,21 @@
- 8080
- 6650
environment:
- PULSAR_MEM: " -Xms8g -Xmx8g -XX:MaxDirectMemorySize=8g"
- command: >
- /bin/bash -c
- "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
+ PULSAR_MEM: " -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
+ command: |
+ /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
pulsar-express:
container_name: pulsar-express
- image: bbonnin/pulsar-express
+ image: bbonnin/pulsar-express:0.5.1
depends_on:
- pulsar
ports:
- '3000:3000'
environment:
- SERVICE_URL: http://pulsar:8080
+ SERVICE_URL: 'http://pulsar:8080'
networks:
default:
name: pulsar-network
- driver: bridge
\ No newline at end of file
+ driver: bridge
diff --git a/src/DotPulsar.Stress.Tests/ConsumerTests.cs b/src/DotPulsar.Stress.Tests/ConsumerTests.cs
index 0ce32e5..cdfe6ab 100644
--- a/src/DotPulsar.Stress.Tests/ConsumerTests.cs
+++ b/src/DotPulsar.Stress.Tests/ConsumerTests.cs
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
using System.Collections.Generic;
using System.Linq;
diff --git a/src/DotPulsar.Stress.Tests/EnumerableTaskExtensions.cs b/src/DotPulsar.Stress.Tests/EnumerableTaskExtensions.cs
index 6db693d..b7222e9 100644
--- a/src/DotPulsar.Stress.Tests/EnumerableTaskExtensions.cs
+++ b/src/DotPulsar.Stress.Tests/EnumerableTaskExtensions.cs
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
diff --git a/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs b/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
index e295280..95a9fd8 100644
--- a/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
+++ b/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
@@ -1,3 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
using System.Threading.Tasks;
using DotPulsar.Abstractions;
diff --git a/src/DotPulsar/Internal/Abstractions/IEvent.cs b/src/DotPulsar/Internal/Abstractions/IEvent.cs
index 7924e8f..4b1ef8b 100644
--- a/src/DotPulsar/Internal/Abstractions/IEvent.cs
+++ b/src/DotPulsar/Internal/Abstractions/IEvent.cs
@@ -18,6 +18,6 @@
{
public interface IEvent
{
- public Guid CorrelationId { get; }
+ Guid CorrelationId { get; }
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProcess.cs b/src/DotPulsar/Internal/Abstractions/IProcess.cs
index aea0447..9b3e0e3 100644
--- a/src/DotPulsar/Internal/Abstractions/IProcess.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProcess.cs
@@ -18,8 +18,9 @@
{
public interface IProcess : IAsyncDisposable
{
- public Guid CorrelationId { get; }
- public void Start();
- public void Handle(IEvent @event);
+ Guid CorrelationId { get; }
+
+ void Start();
+ void Handle(IEvent @event);
}
-}
+}
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 646168c..d4ab7cc 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -63,9 +63,10 @@
var messagePackage = await _queue.Dequeue(cancellationToken);
if (!messagePackage.IsValid())
+ {
+ await RejectPackage(messagePackage);
continue;
-
- await AcknowledgePackage(messagePackage);
+ }
var metadataSize = messagePackage.GetMetadataSize();
var data = messagePackage.ExtractData(metadataSize);
@@ -145,7 +146,7 @@
_sendWhenZero = _cachedCommandFlow.MessagePermits;
}
- private async Task AcknowledgePackage(MessagePackage messagePackage)
+ private async Task RejectPackage(MessagePackage messagePackage)
{
var ack = new CommandAck
{
diff --git a/src/DotPulsar/Internal/MessagePackageExtensions.cs b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
similarity index 95%
rename from src/DotPulsar/Internal/MessagePackageExtensions.cs
rename to src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
index 81f89e7..45afe67 100644
--- a/src/DotPulsar/Internal/MessagePackageExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessagePackageExtensions.cs
@@ -13,9 +13,8 @@
*/
using System.Buffers;
-using DotPulsar.Internal.Extensions;
-namespace DotPulsar.Internal
+namespace DotPulsar.Internal.Extensions
{
public static class MessagePackageExtensions
{