Update Pulsar to 2.11.0 (#123)
* Update Pulsar to 2.11.0
* Add Mac M1 support for running ITs
* Update Reactor to 3.5.4
- Handle removed Schedulers.elastic() by
switching usages of it to Schedulers.boundedElastic()
* Update to Jackson 2.14.2
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 98a80ed..1aa80f6 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -17,16 +17,16 @@
# under the License.
#
[versions]
-pulsar = "2.10.2"
+pulsar = "2.11.0"
junit-jupiter = "5.8.2"
log4j = "2.18.0"
slf4j = "1.7.36"
-reactor = "3.4.22"
+reactor = "3.5.4"
assertj = "3.23.1"
testcontainers = "1.17.3"
jctools = "3.3.0"
caffeine = "2.9.3"
-jackson = "2.13.4"
+jackson = "2.14.2"
checkstyle = '8.45.1'
spring-javaformat = '0.0.34'
licenser = "0.6.1"
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
index 61202a0..2d88f8d 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.reactive.client.adapter;
+import java.util.Locale;
+
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testcontainers.containers.PulsarContainer;
@@ -31,9 +33,8 @@
}
/** The singleton instance for Pulsar container. */
- static PulsarContainer PULSAR_CONTAINER = new PulsarContainer(
- DockerImageName.parse("apachepulsar/pulsar").withTag("2.10.2"))
- .withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
+ static PulsarContainer PULSAR_CONTAINER = new PulsarContainer(getPulsarImage())
+ .withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
static {
PULSAR_CONTAINER.start();
@@ -44,4 +45,22 @@
.build();
}
+ static DockerImageName getPulsarImage() {
+ return isRunningOnMacM1() ? getMacM1PulsarImage() : getStandardPulsarImage();
+ }
+
+ private static boolean isRunningOnMacM1() {
+ String osName = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
+ String osArchitecture = System.getProperty("os.arch").toLowerCase(Locale.ENGLISH);
+ return osName.contains("mac") && osArchitecture.equals("aarch64");
+ }
+
+ private static DockerImageName getStandardPulsarImage() {
+ return DockerImageName.parse("apachepulsar/pulsar:2.11.0");
+ }
+
+ private static DockerImageName getMacM1PulsarImage() {
+ return DockerImageName.parse("kezhenxu94/pulsar").asCompatibleSubstituteFor("apachepulsar/pulsar");
+ }
+
}
diff --git a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
index 2bbb1bd..54691e1 100644
--- a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
+++ b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
@@ -98,9 +98,8 @@
case "single":
return Schedulers.single();
case "boundedElastic":
- return Schedulers.boundedElastic();
case "elastic":
- return Schedulers.elastic();
+ return Schedulers.boundedElastic();
case "immediate":
return Schedulers.immediate();
default:
diff --git a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
index 02f7a46..8fd7501 100644
--- a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
+++ b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
@@ -450,7 +450,7 @@
}
@ParameterizedTest
- @ValueSource(strings = { "parallel", "elastic", "boundedElastic", "immediate", "single" })
+ @ValueSource(strings = { "parallel", "boundedElastic", "immediate", "single" })
void shouldSerDeserScheduler(String scheduler) throws Exception {
String content = (String.format("\"%s\"", scheduler));
Scheduler policy = MAPPER.readValue(content, Scheduler.class);
@@ -459,6 +459,13 @@
}
@Test
+ void shouldSerDeserDeprecatedElasticScheduler() throws Exception {
+ Scheduler policy = MAPPER.readValue("\"elastic\"", Scheduler.class);
+ String json = MAPPER.writeValueAsString(policy);
+ assertThat(json).isEqualTo("\"boundedElastic\"");
+ }
+
+ @Test
void shouldSerializeCustomScheduler() throws Exception {
String json = MAPPER.writeValueAsString(new TestScheduler());
String expected = "\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestScheduler\"";