Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface (#10878)
### Motivation
The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code. The module should on have the following dependencies
1. pulsar-io-core
2. pulsar-functions-api
3. pulsar-client-api
4. slf4j-api
5. log4j-slf4j-impl
6. log4j-api
7. log4j-core
*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
### Modifications
Change dep pulsar-client-original to pulsar-client-api
Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar
There is also a fix for an issue introduced by https://github.com/apache/pulsar/pull/9673. The thread context class loader was set incorrectly in ThreadRuntime.
### Future improvements
1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar
2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better. The current name can be confusing
(cherry picked from commit d81b5f8b8e6cb17f307ec830accaf9dd95d7643b)
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 57ae871..27d12c4 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -65,6 +65,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper-prometheus-metrics</artifactId>
+ <version>${zookeeper.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-package-bookkeeper-storage</artifactId>
<version>${project.version}</version>
diff --git a/pom.xml b/pom.xml
index f4ce36c..c98a55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1199,13 +1199,12 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper-prometheus-metrics</artifactId>
- <version>${zookeeper.version}</version>
- </dependency>
</dependencies>
<build>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2f3d32d..4445f64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -83,6 +83,7 @@
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
@@ -135,7 +136,6 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
index 1ebf7f1..3daf920 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
@@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -97,11 +98,12 @@
assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
}
Map<String, String> properties = Maps.newHashMap();
- SchemaInfo schemaInfo = new SchemaInfo();
- schemaInfo.setType(SchemaType.STRING);
- schemaInfo.setProperties(properties);
- schemaInfo.setName("test");
- schemaInfo.setSchema("".getBytes());
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+ .type(SchemaType.STRING)
+ .properties(properties)
+ .name("test")
+ .schema("".getBytes())
+ .build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
@@ -145,11 +147,12 @@
}
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "value1");
- SchemaInfo schemaInfo = new SchemaInfo();
- schemaInfo.setType(SchemaType.STRING);
- schemaInfo.setProperties(properties);
- schemaInfo.setName("test");
- schemaInfo.setSchema("".getBytes());
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+ .type(SchemaType.STRING)
+ .properties(properties)
+ .name("test")
+ .schema("".getBytes())
+ .build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
@@ -174,11 +177,12 @@
}
admin.namespaces().setSchemaValidationEnforced(namespace,true);
Map<String, String> properties = Maps.newHashMap();
- SchemaInfo schemaInfo = new SchemaInfo();
- schemaInfo.setType(SchemaType.STRING);
- schemaInfo.setProperties(properties);
- schemaInfo.setName("test");
- schemaInfo.setSchema("".getBytes());
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+ .type(SchemaType.STRING)
+ .properties(properties)
+ .name("test")
+ .schema("".getBytes())
+ .build();
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 04914fe..32a9f9e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -118,11 +119,12 @@
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
JsonSchema schema = schemaGen.generateSchema(pojo);
- SchemaInfo info = new SchemaInfo();
- info.setName("");
- info.setProperties(properties);
- info.setType(SchemaType.JSON);
- info.setSchema(mapper.writeValueAsBytes(schema));
+ SchemaInfo info = SchemaInfoImpl.builder()
+ .name("")
+ .properties(properties)
+ .type(SchemaType.JSON)
+ .schema(mapper.writeValueAsBytes(schema))
+ .build();
return new OldJSONSchema<>(info, pojo, mapper);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d97b989..6e860ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -61,6 +61,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -437,7 +438,7 @@
admin.topics().createPartitionedTopic(topic, 2);
// set schema
- SchemaInfo schemaInfo = SchemaInfo
+ SchemaInfo schemaInfo = SchemaInfoImpl
.builder()
.schema(new byte[0])
.name("dummySchema")
@@ -653,7 +654,7 @@
final Map<String, String> map = new HashMap<>();
map.put("key", null);
map.put(null, "value"); // null key is not allowed for JSON, it's only for test here
- Schema.INT32.getSchemaInfo().setProperties(map);
+ ((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map);
final Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
.subscriptionName("sub")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index d6d96f7..61d8332 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -323,7 +324,7 @@
SchemaCompatibilityStrategy.FULL);
byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
.getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes();
- SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
admin.schemas().createSchema(fqtn, schemaInfo);
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 9a9a4ed..4408ae2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -434,7 +435,7 @@
// the util function converts `GetSchemaResponse` to `SchemaInfo`
static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
GetSchemaResponse response) {
- SchemaInfo info = new SchemaInfo();
+
byte[] schema;
if (response.getType() == SchemaType.KEY_VALUE) {
schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(
@@ -442,11 +443,13 @@
} else {
schema = response.getData().getBytes(UTF_8);
}
- info.setSchema(schema);
- info.setType(response.getType());
- info.setProperties(response.getProperties());
- info.setName(tn.getLocalName());
- return info;
+
+ return SchemaInfoImpl.builder()
+ .schema(schema)
+ .type(response.getType())
+ .properties(response.getProperties())
+ .name(tn.getLocalName())
+ .build();
}
static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn,
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index f2c5860..0070c4c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -18,16 +18,7 @@
*/
package org.apache.pulsar.common.schema;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.util.Base64;
-import java.util.Collections;
import java.util.Map;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -37,55 +28,24 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-@Builder
-public class SchemaInfo {
+public interface SchemaInfo {
- @EqualsAndHashCode.Exclude
- private String name;
+ String getName();
/**
* The schema data in AVRO JSON format.
*/
- private byte[] schema;
+ byte[] getSchema();
/**
* The type of schema (AVRO, JSON, PROTOBUF, etc..).
*/
- private SchemaType type;
+ SchemaType getType();
/**
* Additional properties of the schema definition (implementation defined).
*/
- @Builder.Default
- private Map<String, String> properties = Collections.emptyMap();
+ Map<String, String> getProperties();
- public String getSchemaDefinition() {
- if (null == schema) {
- return "";
- }
-
- switch (type) {
- case AVRO:
- case JSON:
- case PROTOBUF:
- case PROTOBUF_NATIVE:
- return new String(schema, UTF_8);
- case KEY_VALUE:
- KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
- DefaultImplementation.decodeKeyValueSchemaInfo(this);
- return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
- default:
- return Base64.getEncoder().encodeToString(schema);
- }
- }
-
- @Override
- public String toString(){
- return DefaultImplementation.jsonifySchemaInfo(this);
- }
-
+ String getSchemaDefinition();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a7cc68e..bed2b9c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -90,7 +90,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 9108a6e..f2cc169 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -47,7 +47,7 @@
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 3db9554..8971aab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -74,9 +75,9 @@
if (requireSchemaValidation) {
// verify if the message can be decoded by the underlying schema
- if (schema instanceof KeyValueSchemaImpl
- && ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
- ((KeyValueSchemaImpl) schema).getValueSchema().validate(message);
+ if (schema instanceof KeyValueSchema
+ && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+ ((KeyValueSchema) schema).getValueSchema().validate(message);
} else {
schema.validate(message);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
index c66ff43..3b5296e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Boolean")
.setType(SchemaType.BOOLEAN)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index 658e398..ce68298 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -33,7 +33,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("ByteBuf")
.setType(SchemaType.BYTES)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index c560f0e..0ff308f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -34,7 +34,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("ByteBuffer")
.setType(SchemaType.BYTES)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 4e4c27e..6d51687 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("INT8")
.setType(SchemaType.INT8)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 9c7ec37..98a0e66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -31,7 +31,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Bytes")
.setType(SchemaType.BYTES)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
index 295dae6..cbdb912 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -33,7 +33,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Date")
.setType(SchemaType.DATE)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index baa1aac..4b269a6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Double")
.setType(SchemaType.DOUBLE)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index aed905b..84d4073 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Float")
.setType(SchemaType.FLOAT)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
index 5830cea..db33de7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -33,7 +33,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Instant")
.setType(SchemaType.INSTANT)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index fc8338e..dfad280 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("INT32")
.setType(SchemaType.INT32)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4e3b874..9fe6aed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -74,11 +74,11 @@
ObjectMapper objectMapper = new ObjectMapper();
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper);
JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
- backwardsCompatibleSchemaInfo = new SchemaInfo();
- backwardsCompatibleSchemaInfo.setName("");
- backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties());
- backwardsCompatibleSchemaInfo.setType(SchemaType.JSON);
- backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
+ backwardsCompatibleSchemaInfo = new SchemaInfoImpl()
+ .setName("")
+ .setProperties(schemaInfo.getProperties())
+ .setType(SchemaType.JSON)
+ .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
index add6fd2..18ef3af 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("LocalDate")
.setType(SchemaType.LOCAL_DATE)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index aa86a19..05b2787 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -37,7 +37,7 @@
public static final String DELIMITER = ":";
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("LocalDateTime")
.setType(SchemaType.LOCAL_DATE_TIME)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
index 6e2bf62..e53c620 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("LocalTime")
.setType(SchemaType.LOCAL_TIME)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index f1491f4..deccaf4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("INT64")
.setType(SchemaType.INT64)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 385fc41..9cf753c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -72,11 +72,10 @@
setReader(new ProtobufNativeReader<>(protoMessageInstance));
setWriter(new ProtobufNativeWriter<>());
// update properties with protobuf related properties
- Map<String, String> allProperties = new HashMap<>();
- allProperties.putAll(schemaInfo.getProperties());
// set protobuf parsing info
+ Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties());
allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
- schemaInfo.setProperties(allProperties);
+ ((SchemaInfoImpl)schemaInfo).setProperties(allProperties);
}
private String getParsingInfo(T protoMessageInstance) {
@@ -124,7 +123,7 @@
}
Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo());
- SchemaInfo schemaInfo = SchemaInfo.builder()
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.schema(ProtobufNativeSchemaUtils.serialize(descriptor))
.type(SchemaType.PROTOBUF_NATIVE)
.name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index f7971eb..275cacd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -69,11 +69,10 @@
setReader(new ProtobufReader<>(protoMessageInstance));
setWriter(new ProtobufWriter<>());
// update properties with protobuf related properties
- Map<String, String> allProperties = new HashMap<>();
- allProperties.putAll(schemaInfo.getProperties());
// set protobuf parsing info
+ Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties());
allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
- schemaInfo.setProperties(allProperties);
+ ((SchemaInfoImpl)schemaInfo).setProperties(allProperties);
}
private String getParsingInfo(T protoMessageInstance) {
@@ -111,7 +110,7 @@
+ " is not assignable from " + pojo.getName());
}
- SchemaInfo schemaInfo = SchemaInfo.builder()
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8))
.type(SchemaType.PROTOBUF)
.name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
index ee9f0cb..0fda7d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
@@ -105,7 +105,7 @@
}
baseSchema.setFields(avroFields);
- return new SchemaInfo(
+ return new SchemaInfoImpl(
name,
baseSchema.toString().getBytes(UTF_8),
schemaType,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
new file mode 100644
index 0000000..fb5263e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.experimental.UtilityClass;
+
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.Schema;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Class helping to initialize schemas.
+ */
+@UtilityClass
+public class SchemaInfoUtil {
+
+ public static SchemaInfo newSchemaInfo(String name, SchemaData data) {
+ return SchemaInfoImpl.builder()
+ .name(name)
+ .schema(data.getData())
+ .type(data.getType())
+ .properties(data.getProps())
+ .build();
+ }
+
+ public static SchemaInfo newSchemaInfo(Schema schema) {
+ SchemaInfoImpl.SchemaInfoImplBuilder si = SchemaInfoImpl.builder()
+ .name(schema.getName())
+ .schema(schema.getSchemaData())
+ .type(Commands.getSchemaType(schema.getType()));
+ if (schema.getPropertiesCount() == 0) {
+ si.properties(Collections.emptyMap());
+ } else {
+ Map<String, String> properties = new TreeMap<>();
+ for (int i = 0; i < schema.getPropertiesCount(); i++) {
+ KeyValue kv = schema.getPropertyAt(i);
+ properties.put(kv.getKey(), kv.getValue());
+ }
+
+ si.properties(properties);
+ }
+ return si.build();
+ }
+
+ public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) {
+ return SchemaInfoImpl.builder()
+ .name(name)
+ .schema(schema.getData().getBytes(StandardCharsets.UTF_8))
+ .type(schema.getType())
+ .properties(schema.getProperties())
+ .build();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index 4014405..bbb5ad6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -32,7 +32,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("INT16")
.setType(SchemaType.INT16)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 7e57f6c..462fa60 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -46,7 +46,7 @@
// Ensure the ordering of the static initialization
CHARSET_KEY = "__charset";
DEFAULT_CHARSET = StandardCharsets.UTF_8;
- DEFAULT_SCHEMA_INFO = new SchemaInfo()
+ DEFAULT_SCHEMA_INFO = new SchemaInfoImpl()
.setName("String")
.setType(SchemaType.STRING)
.setSchema(new byte[0]);
@@ -87,7 +87,7 @@
this.charset = charset;
Map<String, String> properties = new HashMap<>();
properties.put(CHARSET_KEY, charset.name());
- this.schemaInfo = new SchemaInfo()
+ this.schemaInfo = new SchemaInfoImpl()
.setName(DEFAULT_SCHEMA_INFO.getName())
.setType(SchemaType.STRING)
.setSchema(DEFAULT_SCHEMA_INFO.getSchema())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 8cc1868..7ba116f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -105,7 +105,7 @@
}
public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
- return SchemaInfo.builder()
+ return SchemaInfoImpl.builder()
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
.properties(schemaDefinition.getProperties())
.name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
index d56e4da..ab6e1ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -33,7 +33,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Time")
.setType(SchemaType.TIME)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
index 899e159..755b466 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -33,7 +33,7 @@
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfo()
+ SCHEMA_INFO = new SchemaInfoImpl()
.setName("Timestamp")
.setType(SchemaType.TIMESTAMP)
.setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
index 7e1e1c0..70d7fc0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
@@ -23,6 +23,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -47,7 +48,7 @@
}
public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
- return SchemaInfo.builder()
+ return SchemaInfoImpl.builder()
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
.properties(schemaDefinition.getProperties())
.name("")
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml
index bf01926..a37c886 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -69,6 +69,22 @@
<Class name="org.apache.pulsar.client.impl.schema.BooleanSchema"/>
<Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
</Match>
+
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl$SchemaInfoImplBuilder"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
<Match>
<Class name="~org.apache.pulsar.client.impl.ConsumerImpl.*"/>
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
index 994f013..9474b80 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -171,7 +171,7 @@
KeyValueEncodingType.SEPARATED
);
- SchemaInfo oldSchemaInfo = new SchemaInfo()
+ SchemaInfo oldSchemaInfo = new SchemaInfoImpl()
.setName("")
.setType(SchemaType.KEY_VALUE)
.setSchema(kvSchema.getSchemaInfo().getSchema())
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index b0a86c2..bd94bf0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -30,6 +30,7 @@
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -67,28 +68,32 @@
@Test
public void testFillParametersToSchemainfo() {
- AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
- AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
-
- fooSchema.getSchemaInfo().setName("foo");
- fooSchema.getSchemaInfo().setType(SchemaType.AVRO);
Map<String, String> keyProperties = Maps.newTreeMap();
keyProperties.put("foo.key1", "value");
keyProperties.put("foo.key2", "value");
- fooSchema.getSchemaInfo().setProperties(keyProperties);
- barSchema.getSchemaInfo().setName("bar");
- barSchema.getSchemaInfo().setType(SchemaType.AVRO);
+
Map<String, String> valueProperties = Maps.newTreeMap();
valueProperties.put("bar.key", "key");
- barSchema.getSchemaInfo().setProperties(valueProperties);
+
+ AvroSchema<Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<Foo>builder()
+ .withPojo(Foo.class)
+ .withProperties(keyProperties)
+ .build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<Bar>builder()
+ .withPojo(Bar.class)
+ .withProperties(valueProperties)
+ .build());
+
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
- assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo");
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO));
- assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
- assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar");
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"),
+ "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO));
- assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}");
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"),
+ "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"bar.key\":\"key\"}");
}
@Test
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
index 7e34a64..f96e84e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -289,7 +289,7 @@
@Test
public void testUnsetProperties() {
- final SchemaInfo schemaInfo = SchemaInfo.builder()
+ final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.type(SchemaType.STRING)
.schema(new byte[0])
.name("string")
@@ -305,7 +305,7 @@
public void testSetProperties() {
final Map<String, String> map = Maps.newHashMap();
map.put("test", "value");
- final SchemaInfo schemaInfo = SchemaInfo.builder()
+ final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.type(SchemaType.STRING)
.schema(new byte[0])
.name("string")
@@ -322,10 +322,16 @@
public void testNullPropertyValue() {
final Map<String, String> map = new HashMap<>();
map.put("key", null);
- final IntSchema schema = new IntSchema();
- schema.getSchemaInfo().setProperties(map);
+
+ SchemaInfo si = SchemaInfoImpl.builder()
+ .name("INT32")
+ .schema(new byte[0])
+ .type(SchemaType.INT32)
+ .properties(map)
+ .build();
+
// null key will be skipped by Gson when serializing JSON to String
- assertEquals(schema.getSchemaInfo().toString(), INT32_SCHEMA_INFO);
+ assertEquals(si.toString(), INT32_SCHEMA_INFO);
}
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
index b09bf4d..b250322 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
@@ -87,7 +87,7 @@
@Test
public void testSchemaInfoWithoutCharset() {
- SchemaInfo si = new SchemaInfo()
+ SchemaInfo si = new SchemaInfoImpl()
.setName("test-schema-info-without-charset")
.setType(SchemaType.STRING)
.setSchema(new byte[0])
@@ -122,7 +122,7 @@
public void testSchemaInfoWithCharset(Charset charset) {
Map<String, String> properties = new HashMap<>();
properties.put(StringSchema.CHARSET_KEY, charset.name());
- SchemaInfo si = new SchemaInfo()
+ SchemaInfo si = new SchemaInfoImpl()
.setName("test-schema-info-without-charset")
.setType(SchemaType.STRING)
.setSchema(new byte[0])
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index fd29ce7..05028a0 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -183,6 +183,10 @@
<version>1.1.7.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
</dependencies>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
index 0f0429e..c58fef3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
@@ -25,7 +25,6 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import lombok.Value;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
@@ -47,15 +46,17 @@
return new AuthPoliciesImplBuilder();
}
- private static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
+
+ public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
private Map<String, Set<AuthAction>> namespaceAuthentication = new TreeMap<>();
- private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();
- private Map<String, Set<String>> subscriptionAuthentication = new TreeMap<>();
+ private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();;
+ private Map<String, Set<String>> subscriptionAuthentication= new TreeMap<>();;
AuthPoliciesImplBuilder() {
}
- public AuthPoliciesImplBuilder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication) {
+ public AuthPoliciesImplBuilder namespaceAuthentication(
+ Map<String, Set<AuthAction>> namespaceAuthentication) {
this.namespaceAuthentication = namespaceAuthentication;
return this;
}
@@ -66,7 +67,8 @@
return this;
}
- public AuthPoliciesImplBuilder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication) {
+ public AuthPoliciesImplBuilder subscriptionAuthentication(
+ Map<String, Set<String>> subscriptionAuthentication) {
this.subscriptionAuthentication = subscriptionAuthentication;
return this;
}
@@ -74,5 +76,11 @@
public AuthPoliciesImpl build() {
return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication);
}
+
+ public String toString() {
+ return "AuthPoliciesImpl.AuthPoliciesImplBuilder(namespaceAuthentication=" + this.namespaceAuthentication
+ + ", topicAuthentication=" + this.topicAuthentication + ", subscriptionAuthentication="
+ + this.subscriptionAuthentication + ")";
+ }
}
}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
similarity index 96%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 9573526..5f36909 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -46,7 +46,7 @@
@Override
public SchemaInfo getSchemaInfo() {
- return BytesSchema.BYTES.getSchemaInfo();
+ return Schema.BYTES.getSchemaInfo();
}
@Override
@@ -169,11 +169,12 @@
properties.put(KV_ENCODING_TYPE, String.valueOf(keyValueEncodingType));
// generate the final schema info
- return new SchemaInfo()
- .setName(schemaName)
- .setType(SchemaType.KEY_VALUE)
- .setSchema(schemaData)
- .setProperties(properties);
+ return SchemaInfoImpl.builder()
+ .name(schemaName)
+ .type(SchemaType.KEY_VALUE)
+ .schema(schemaData)
+ .properties(properties)
+ .build();
}
private static void encodeSubSchemaInfoToParentSchemaProperties(SchemaInfo schemaInfo,
@@ -237,7 +238,7 @@
} else {
schemaProps = SchemaUtils.deserializeSchemaProperties(schemaPropsStr);
}
- return SchemaInfo.builder()
+ return SchemaInfoImpl.builder()
.name(schemaName)
.type(schemaType)
.schema(schemaData)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
new file mode 100644
index 0000000..ca8b6cc
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * Information about the schema.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
+@Builder
+public class SchemaInfoImpl implements SchemaInfo {
+
+ @EqualsAndHashCode.Exclude
+ private String name;
+
+ /**
+ * The schema data in AVRO JSON format.
+ */
+ private byte[] schema;
+
+ /**
+ * The type of schema (AVRO, JSON, PROTOBUF, etc..).
+ */
+ private SchemaType type;
+
+ /**
+ * Additional properties of the schema definition (implementation defined).
+ */
+ @Builder.Default
+ private Map<String, String> properties = Collections.emptyMap();
+
+ public String getSchemaDefinition() {
+ if (null == schema) {
+ return "";
+ }
+
+ switch (type) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ case PROTOBUF_NATIVE:
+ return new String(schema, UTF_8);
+ case KEY_VALUE:
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(this);
+ return SchemaUtils.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
+ default:
+ return Base64.getEncoder().encodeToString(schema);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return SchemaUtils.jsonifySchemaInfo(this);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
similarity index 100%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index 5c00f06..d5b4405 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -22,6 +22,7 @@
import java.util.Map;
import lombok.Builder;
import lombok.Data;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -45,7 +46,7 @@
* @return the converted schema info.
*/
public SchemaInfo toSchemaInfo() {
- return SchemaInfo.builder()
+ return SchemaInfoImpl.builder()
.name("")
.type(type)
.schema(data)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
deleted file mode 100644
index ac5997d..0000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.protocol.schema;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.TreeMap;
-import lombok.experimental.UtilityClass;
-
-import org.apache.pulsar.common.api.proto.KeyValue;
-import org.apache.pulsar.common.api.proto.Schema;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.schema.SchemaInfo;
-
-/**
- * Class helping to initialize schemas.
- */
-@UtilityClass
-public class SchemaInfoUtil {
-
- public static SchemaInfo newSchemaInfo(String name, SchemaData data) {
- SchemaInfo si = new SchemaInfo();
- si.setName(name);
- si.setSchema(data.getData());
- si.setType(data.getType());
- si.setProperties(data.getProps());
- return si;
- }
-
- public static SchemaInfo newSchemaInfo(Schema schema) {
- SchemaInfo si = new SchemaInfo();
- si.setName(schema.getName());
- si.setSchema(schema.getSchemaData());
- si.setType(Commands.getSchemaType(schema.getType()));
- if (schema.getPropertiesCount() == 0) {
- si.setProperties(Collections.emptyMap());
- } else {
- si.setProperties(new TreeMap<>());
- for (int i = 0; i < schema.getPropertiesCount(); i++) {
- KeyValue kv = schema.getPropertyAt(i);
- si.getProperties().put(kv.getKey(), kv.getValue());
- }
- }
- return si;
- }
-
- public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) {
- SchemaInfo si = new SchemaInfo();
- si.setName(name);
- si.setSchema(schema.getData().getBytes(StandardCharsets.UTF_8));
- si.setType(schema.getType());
- si.setProperties(schema.getProperties());
- return si;
- }
-}
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 169985b..bf700783 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -30,6 +30,19 @@
<relativePath>..</relativePath>
</parent>
+ <!--
+
+ THIS MODULE SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE.
+ THIS MODULE SHOULD ONLY DEPEND ON:
+ 1. pulsar-io-core
+ 2. pulsar-functions-api
+ 3. pulsar-client-api
+ 4. slf4j-api
+ 5. log4j-slf4j-impl
+ 6. log4j-api
+ 7. log4j-core
+ -->
+
<artifactId>pulsar-functions-runtime-all</artifactId>
<name>Pulsar Functions :: Runtime All</name>
@@ -48,7 +61,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-original</artifactId>
+ <artifactId>pulsar-client-api</artifactId>
<version>${project.version}</version>
</dependency>
@@ -77,6 +90,19 @@
<build>
<plugins>
+ <!--
+ Disable the maven-jar-plugin since we don't need the default jar and it conflicts with the maven-assembly-plugin
+ -->
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-jar</id>
+ <phase/>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
@@ -92,7 +118,7 @@
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
- <phase>package</phase>
+ <phase>compile</phase>
<goals>
<goal>single</goal>
</goals>
diff --git a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
index bd64bf7..6852792 100644
--- a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
+++ b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
@@ -36,6 +36,7 @@
* This class will create three classloaders:
* 1. The root classloader that will share interfaces between the function instance
* classloader and user code classloader. This classloader will contain the following dependencies
+ * - pulsar-io-core
* - pulsar-functions-api
* - pulsar-client-api
* - log4j-slf4j-impl
diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
new file mode 100644
index 0000000..3bdd23f
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+@Slf4j
+/**
+ * This test serves to make sure that the correct classes are included in the java-instance.jar
+ * THAT JAR SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE
+ * WHICH INCLUDES CLASSES FROM THE FOLLOWING LIBRARIES
+ * 1. pulsar-io-core
+ * 2. pulsar-functions-api
+ * 3. pulsar-client-api
+ * 4. slf4j-api
+ * 5. log4j-slf4j-impl
+ * 6. log4j-api
+ * 7. log4j-core
+ */
+public class JavaInstanceDepsTest {
+
+ @Test
+ public void testInstanceJarDeps() throws IOException {
+ File jar = new File("target/java-instance.jar");
+
+ @Cleanup
+ ZipInputStream zip = new ZipInputStream(jar.toURI().toURL().openStream());
+
+ List<String> notAllowedClasses = new LinkedList<>();
+ while(true) {
+ ZipEntry e = zip.getNextEntry();
+ if (e == null)
+ break;
+ String name = e.getName();
+ if (name.endsWith(".class") && !name.startsWith("META-INF")) {
+ // The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes
+ // filter out those classes to see if there are any other classes that should not be allowed
+ if (!name.startsWith("org/apache/pulsar")
+ && !name.startsWith("org/slf4j")
+ && !name.startsWith("org/apache/logging/slf4j")
+ && !name.startsWith("org/apache/logging/log4j")) {
+ notAllowedClasses.add(name);
+ }
+ }
+ }
+
+ Assert.assertEquals(notAllowedClasses, Collections.emptyList(), notAllowedClasses.toString());
+ }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index e0a9c64..474410c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -179,7 +179,6 @@
String.format("%s-%s",
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
instanceConfig.getInstanceId()));
- this.fnThread.setContextClassLoader(functionClassLoader);
this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
index 2db9d6c..ba57692 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
@@ -27,6 +27,7 @@
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.Converter;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -44,7 +45,7 @@
Map<String, String> props = new HashMap<>();
boolean isJsonConverter = converter instanceof JsonConverter;
props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
- this.schemaInfo = SchemaInfo.builder()
+ this.schemaInfo = SchemaInfoImpl.builder()
.name(isJsonConverter? "KafKaJson" : "KafkaAvro")
.type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO)
.schema(schema.toString().getBytes(UTF_8))
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
index eda8c96..2a9e1c4 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -25,6 +25,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -65,7 +66,7 @@
org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
String definition = schema.toString(false);
log.info("Schema {} definition {}", schemaId, definition);
- SchemaInfo schemaInfo = SchemaInfo.builder()
+ SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.type(SchemaType.AVRO)
.name(schema.getName())
.properties(Collections.emptyMap())
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 81bf031..47bedbe 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -42,6 +42,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -235,7 +236,7 @@
static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
DeferredSchemaPlaceholder() {
- super(SchemaInfo
+ super(SchemaInfoImpl
.builder()
.type(SchemaType.AVRO)
.properties(Collections.emptyMap())
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index b155e45..3f0bc09 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -450,7 +450,6 @@
* Apache Zookeeper
- zookeeper-3.6.3.jar
- zookeeper-jute-3.6.3.jar
- - zookeeper-prometheus-metrics-3.6.3.jar
* Apache Yetus Audience Annotations
- audience-annotations-0.5.0.jar
* Swagger
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 03a6b77..645edbd 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -101,7 +102,7 @@
this.offloadPolicies = offloadPolicies;
ObjectMapper objectMapper = new ObjectMapper();
- this.schemaInfo = SchemaInfo.builder()
+ this.schemaInfo = SchemaInfoImpl.builder()
.name(originSchemaName)
.type(schemaType)
.schema(schema.getBytes("ISO8859-1"))
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index cd31b4b..79fb789 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -23,6 +23,7 @@
import io.prestosql.spi.connector.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -188,9 +189,10 @@
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
- SchemaInfo badSchemaInfo = new SchemaInfo();
- badSchemaInfo.setSchema(new byte[0]);
- badSchemaInfo.setType(SchemaType.AVRO);
+ SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+ .schema(new byte[0])
+ .type(SchemaType.AVRO)
+ .build();
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
@@ -214,9 +216,10 @@
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
- SchemaInfo badSchemaInfo = new SchemaInfo();
- badSchemaInfo.setSchema("foo".getBytes());
- badSchemaInfo.setType(SchemaType.AVRO);
+ SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+ .schema("foo".getBytes())
+ .type(SchemaType.AVRO)
+ .build();
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
index f4810ba..c1b97d3 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
@@ -22,6 +22,7 @@
import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.sql.presto.PulsarColumnHandle;
@@ -65,7 +66,7 @@
public void testPrimitiveType() {
byte int8Value = 1;
- SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
+ SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build();
Schema schemaInt8 = Schema.getSchema(schemaInfoInt8);
List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8,
@@ -77,7 +78,7 @@
PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value);
short int16Value = 2;
- SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
+ SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build();
Schema schemaInt16 = Schema.getSchema(schemaInfoInt16);
List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16,
@@ -89,7 +90,7 @@
PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value);
int int32Value = 2;
- SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
+ SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build();
Schema schemaInt32 = Schema.getSchema(schemaInfoInt32);
List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -102,7 +103,7 @@
PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value);
long int64Value = 2;
- SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
+ SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build();
Schema schemaInt64 = Schema.getSchema(schemaInfoInt64);
List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -116,7 +117,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), int64Value);
String stringValue = "test";
- SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
+ SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build();
Schema schemaString = Schema.getSchema(schemaInfoString);
List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -130,7 +131,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), stringValue);
float floatValue = 0.2f;
- SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
+ SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build();
Schema schemaFloat = Schema.getSchema(schemaInfoFloat);
List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -144,7 +145,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue)));
double doubleValue = 0.22d;
- SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
+ SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build();
Schema schemaDouble = Schema.getSchema(schemaInfoDouble);
List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -158,7 +159,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue);
boolean booleanValue = true;
- SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
+ SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build();
Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean);
List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -173,7 +174,7 @@
byte[] bytesValue = new byte[1];
bytesValue[0] = 1;
- SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
+ SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build();
Schema schemaBytes = Schema.getSchema(schemaInfoBytes);
List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -187,7 +188,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue));
Date dateValue = new Date(System.currentTimeMillis());
- SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
+ SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build();
Schema schemaDate = Schema.getSchema(schemaInfoDate);
List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -201,7 +202,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime());
Time timeValue = new Time(System.currentTimeMillis());
- SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
+ SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build();
Schema schemaTime = Schema.getSchema(schemaInfoTime);
List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -215,7 +216,7 @@
PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime());
Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
- SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
+ SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build();
Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp);
List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index bb5190e..140168d 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -36,7 +36,7 @@
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-original</artifactId>
+ <artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
@@ -82,24 +82,10 @@
</transformers>
<artifactSet>
<includes>
- <include>org.apache.pulsar:pulsar-client-original</include>
- <include>org.apache.pulsar:pulsar-client-api</include>
- <include>org.apache.pulsar:pulsar-client-admin-api</include>
+ <include>org.apache.pulsar:pulsar-client</include>
<include>org.apache.pulsar:pulsar-functions-api-examples</include>
</includes>
</artifactSet>
- <filters>
- <filter>
- <artifact>org.apache.pulsar:pulsar-client-original</artifact>
- <includes>
- <include>**</include>
- </includes>
- <excludes>
- <!-- bouncycastle jars could not be shaded, or the signatures will be wrong-->
- <exclude>org/bouncycastle/**</exclude>
- </excludes>
- </filter>
- </filters>
</configuration>
</execution>
</executions>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
index e0cc328..3c961fa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
@@ -45,6 +45,7 @@
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
import org.testng.annotations.Test;
/**
@@ -59,42 +60,45 @@
String outputTopicName = "test-state-source-output-" + randomName(8);
String sourceName = "test-state-source-" + randomName(8);
int numMessages = 10;
+ try {
+ submitSourceConnector(
+ sourceName,
+ outputTopicName,
+ "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR);
- submitSourceConnector(
- sourceName,
- outputTopicName,
- "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR);
+ // get source info
+ getSourceInfoSuccess(container, sourceName);
- // get source info
- getSourceInfoSuccess(container, sourceName);
+ // get source status
+ getSourceStatus(container, sourceName);
- // get source status
- getSourceStatus(container, sourceName);
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
- try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+ retryStrategically((test) -> {
+ try {
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ return status.getInstances().size() > 0
+ && status.getInstances().get(0).getStatus().numWritten >= 10;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
- retryStrategically((test) -> {
- try {
- SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
- return status.getInstances().size() > 0
- && status.getInstances().get(0).getStatus().numWritten >= 10;
- } catch (PulsarAdminException e) {
- return false;
- }
- }, 10, 200);
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
+ }
- SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
- assertEquals(status.getInstances().size(), 1);
- assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
+ consumeMessages(container, outputTopicName, numMessages);
+
+ // delete source
+ deleteSource(container, sourceName);
+
+ getSourceInfoNotFound(container, sourceName);
+ } finally {
+ dumpFunctionLogs(sourceName);
}
- consumeMessages(container, outputTopicName, numMessages);
-
- // delete source
- deleteSource(container, sourceName);
-
- getSourceInfoNotFound(container, sourceName);
-
}
private void submitSourceConnector(String sourceName,
@@ -129,15 +133,35 @@
}
private static void getSourceStatus(StandaloneContainer container,String sourceName) throws Exception {
+ retryStrategically((test) -> {
+ try {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+
+ if (result.getStdout().contains("\"running\" : true")) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Encountered error when getting source status", e);
+ return false;
+ }
+ }, 10, 200);
+
ContainerExecResult result = container.execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "sources",
- "status",
- "--tenant", "public",
- "--namespace", "default",
- "--name", sourceName
- );
- assertTrue(result.getStdout().contains("\"running\" : true"));
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+
+ Assert.assertTrue(result.getStdout().contains("\"running\" : true"));
}
private static void consumeMessages(StandaloneContainer container, String outputTopic,