Add Joda time logical type conversion. (#6704)
### Motivation
After upgrade to Apache Avro 1.9.x, the default time conversion changed to JSR-310. For forwarding compatibility, we'd better add the Joda time conversion.
related to #5938
### Modifications
Add joda time conversions
### Verifying this change
New integration test added
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 4a3016e..3151824 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -133,6 +133,12 @@
</dependency>
<dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
<version>${presto.version}</version>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java
new file mode 100644
index 0000000..49a3f8a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.schema;
+
+import com.google.common.collect.Sets;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+public class JodaTimeTest extends PulsarTestSuite {
+
+ private PulsarClient client;
+ private PulsarAdmin admin;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ this.client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ this.admin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .build();
+ }
+
+ @Data
+ private static class JodaSchema {
+
+ @org.apache.avro.reflect.AvroSchema("{\n" +
+ " \"type\": \"bytes\",\n" +
+ " \"logicalType\": \"decimal\",\n" +
+ " \"precision\": 4,\n" +
+ " \"scale\": 2\n" +
+ "}")
+ BigDecimal decimal;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}")
+ LocalDate date;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
+ DateTime timestampMillis;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}")
+ LocalTime timeMillis;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
+ long timestampMicros;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}")
+ long timeMicros;
+ }
+
+ @Test
+ public void testJodaTime() throws PulsarAdminException, PulsarClientException {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topic = "test-joda-time-schema";
+ final String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(pulsarCluster.getClusterName())
+ );
+
+ JodaSchema forSend = new JodaSchema();
+ forSend.setDecimal(new BigDecimal("12.34"));
+ forSend.setTimeMicros(System.currentTimeMillis() * 1000);
+ forSend.setTimestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC()));
+ forSend.setTimeMillis(LocalTime.now());
+ forSend.setTimeMicros(System.currentTimeMillis() * 1000);
+ forSend.setDate(LocalDate.now());
+
+ Producer<JodaSchema> producer = client
+ .newProducer(Schema.AVRO(JodaSchema.class))
+ .topic(fqtn)
+ .create();
+
+ Consumer<JodaSchema> consumer = client
+ .newConsumer(Schema.AVRO(JodaSchema.class))
+ .topic(fqtn)
+ .subscriptionName("test")
+ .subscribe();
+
+ producer.send(forSend);
+ JodaSchema received = consumer.receive().getValue();
+ assertEquals(received, forSend);
+
+ producer.close();
+ consumer.close();
+
+ log.info("Successfully Joda time logical type message : {}", received);
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 70a4692..4b0083a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -193,7 +193,8 @@
.build();
Producer<AvroLogicalType> producer = client
- .newProducer(Schema.AVRO(AvroLogicalType.class))
+ .newProducer(Schema.AVRO(SchemaDefinition.<AvroLogicalType>builder().withPojo(AvroLogicalType.class)
+ .withJSR310ConversionEnabled(true).build()))
.topic(fqtn)
.create();
@@ -207,12 +208,7 @@
log.info("Successfully published avro logical type message : {}", messageForSend);
AvroLogicalType received = consumer.receive().getValue();
- assertEquals(messageForSend.getDecimal(), received.getDecimal());
- assertEquals(messageForSend.getTimeMicros(), received.getTimeMicros());
- assertEquals(messageForSend.getTimeMillis(), received.getTimeMillis());
- assertEquals(messageForSend.getTimestampMicros(), received.getTimestampMicros());
- assertEquals(messageForSend.getTimestampMillis(), received.getTimestampMillis());
- assertEquals(messageForSend.getDate(), received.getDate());
+ assertEquals(received, messageForSend);
producer.close();
consumer.close();
diff --git a/tests/integration/src/test/resources/pulsar-schema.xml b/tests/integration/src/test/resources/pulsar-schema.xml
index 7b3b21e..5917805 100644
--- a/tests/integration/src/test/resources/pulsar-schema.xml
+++ b/tests/integration/src/test/resources/pulsar-schema.xml
@@ -23,6 +23,7 @@
<test name="pulsar-schema-test-suite" preserve-order="true" >
<classes>
<class name="org.apache.pulsar.tests.integration.schema.SchemaTest" />
+ <class name="org.apache.pulsar.tests.integration.schema.JodaTimeTest" />
</classes>
</test>
</suite>