Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface (#10878)

### Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
    1. pulsar-io-core
    2. pulsar-functions-api
    3. pulsar-client-api
    4. slf4j-api
    5. log4j-slf4j-impl
    6. log4j-api
    7. log4j-core

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by https://github.com/apache/pulsar/pull/9673. The thread context class loader was set incorrectly in ThreadRuntime.

### Future improvements

1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing


(cherry picked from commit d81b5f8b8e6cb17f307ec830accaf9dd95d7643b)
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 57ae871..27d12c4 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -65,6 +65,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper-prometheus-metrics</artifactId>
+      <version>${zookeeper.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-package-bookkeeper-storage</artifactId>
       <version>${project.version}</version>
diff --git a/pom.xml b/pom.xml
index f4ce36c..c98a55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1199,13 +1199,12 @@
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>*</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper-prometheus-metrics</artifactId>
-      <version>${zookeeper.version}</version>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2f3d32d..4445f64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -83,6 +83,7 @@
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -135,7 +136,6 @@
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
index 1ebf7f1..3daf920 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
@@ -30,6 +30,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -97,11 +98,12 @@
             assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
         }
         Map<String, String> properties = Maps.newHashMap();
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setProperties(properties);
-        schemaInfo.setName("test");
-        schemaInfo.setSchema("".getBytes());
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+                .type(SchemaType.STRING)
+                .properties(properties)
+                .name("test")
+                .schema("".getBytes())
+                .build();
         PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
         admin.schemas().createSchema(topicName, postSchemaPayload);
         try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
@@ -145,11 +147,12 @@
         }
         Map<String, String> properties = Maps.newHashMap();
         properties.put("key1", "value1");
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setProperties(properties);
-        schemaInfo.setName("test");
-        schemaInfo.setSchema("".getBytes());
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+                .type(SchemaType.STRING)
+                .properties(properties)
+                .name("test")
+                .schema("".getBytes())
+                .build();
         PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
         admin.schemas().createSchema(topicName, postSchemaPayload);
         try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
@@ -174,11 +177,12 @@
         }
         admin.namespaces().setSchemaValidationEnforced(namespace,true);
         Map<String, String> properties = Maps.newHashMap();
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setProperties(properties);
-        schemaInfo.setName("test");
-        schemaInfo.setSchema("".getBytes());
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+                .type(SchemaType.STRING)
+                .properties(properties)
+                .name("test")
+                .schema("".getBytes())
+                .build();
         PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
         admin.schemas().createSchema(topicName, postSchemaPayload);
         try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 04914fe..32a9f9e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -33,6 +33,7 @@
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -118,11 +119,12 @@
             JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
             JsonSchema schema = schemaGen.generateSchema(pojo);
 
-            SchemaInfo info = new SchemaInfo();
-            info.setName("");
-            info.setProperties(properties);
-            info.setType(SchemaType.JSON);
-            info.setSchema(mapper.writeValueAsBytes(schema));
+            SchemaInfo info = SchemaInfoImpl.builder()
+                    .name("")
+                    .properties(properties)
+                    .type(SchemaType.JSON)
+                    .schema(mapper.writeValueAsBytes(schema))
+                    .build();
             return new OldJSONSchema<>(info, pojo, mapper);
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d97b989..6e860ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -61,6 +61,7 @@
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -437,7 +438,7 @@
         admin.topics().createPartitionedTopic(topic, 2);
 
         // set schema
-        SchemaInfo schemaInfo = SchemaInfo
+        SchemaInfo schemaInfo = SchemaInfoImpl
                 .builder()
                 .schema(new byte[0])
                 .name("dummySchema")
@@ -653,7 +654,7 @@
         final Map<String, String> map = new HashMap<>();
         map.put("key", null);
         map.put(null, "value"); // null key is not allowed for JSON, it's only for test here
-        Schema.INT32.getSchemaInfo().setProperties(map);
+        ((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map);
 
         final Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
                 .subscriptionName("sub")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index d6d96f7..61d8332 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -323,7 +324,7 @@
                 SchemaCompatibilityStrategy.FULL);
         byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
                 .getSchemaInfo().getSchema(), UTF_8) + "/n   /n   /n").getBytes();
-        SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
         admin.schemas().createSchema(fqtn, schemaInfo);
 
         admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 9a9a4ed..4408ae2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -434,7 +435,7 @@
     // the util function converts `GetSchemaResponse` to `SchemaInfo`
     static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
                                                            GetSchemaResponse response) {
-        SchemaInfo info = new SchemaInfo();
+
         byte[] schema;
         if (response.getType() == SchemaType.KEY_VALUE) {
             schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(
@@ -442,11 +443,13 @@
         } else {
             schema = response.getData().getBytes(UTF_8);
         }
-        info.setSchema(schema);
-        info.setType(response.getType());
-        info.setProperties(response.getProperties());
-        info.setName(tn.getLocalName());
-        return info;
+
+        return SchemaInfoImpl.builder()
+                .schema(schema)
+                .type(response.getType())
+                .properties(response.getProperties())
+                .name(tn.getLocalName())
+                .build();
     }
 
     static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn,
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index f2c5860..0070c4c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -18,16 +18,7 @@
  */
 package org.apache.pulsar.common.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.util.Base64;
-import java.util.Collections;
 import java.util.Map;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
@@ -37,55 +28,24 @@
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-@Builder
-public class SchemaInfo {
+public interface SchemaInfo {
 
-    @EqualsAndHashCode.Exclude
-    private String name;
+    String getName();
 
     /**
      * The schema data in AVRO JSON format.
      */
-    private byte[] schema;
+    byte[] getSchema();
 
     /**
      * The type of schema (AVRO, JSON, PROTOBUF, etc..).
      */
-    private SchemaType type;
+    SchemaType getType();
 
     /**
      * Additional properties of the schema definition (implementation defined).
      */
-    @Builder.Default
-    private Map<String, String> properties = Collections.emptyMap();
+    Map<String, String> getProperties();
 
-    public String getSchemaDefinition() {
-        if (null == schema) {
-            return "";
-        }
-
-        switch (type) {
-            case AVRO:
-            case JSON:
-            case PROTOBUF:
-            case PROTOBUF_NATIVE:
-                return new String(schema, UTF_8);
-            case KEY_VALUE:
-                KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
-                    DefaultImplementation.decodeKeyValueSchemaInfo(this);
-                return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
-            default:
-                return Base64.getEncoder().encodeToString(schema);
-        }
-    }
-
-    @Override
-    public String toString(){
-        return DefaultImplementation.jsonifySchemaInfo(this);
-    }
-
+    String getSchemaDefinition();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a7cc68e..bed2b9c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -90,7 +90,7 @@
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 9108a6e..f2cc169 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -47,7 +47,7 @@
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 3db9554..8971aab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -74,9 +75,9 @@
 
         if (requireSchemaValidation) {
             // verify if the message can be decoded by the underlying schema
-            if (schema instanceof KeyValueSchemaImpl
-                    && ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
-                ((KeyValueSchemaImpl) schema).getValueSchema().validate(message);
+            if (schema instanceof KeyValueSchema
+                    && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+                ((KeyValueSchema) schema).getValueSchema().validate(message);
             } else {
                 schema.validate(message);
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
index c66ff43..3b5296e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
                 .setName("Boolean")
                 .setType(SchemaType.BOOLEAN)
                 .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index 658e398..ce68298 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -33,7 +33,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("ByteBuf")
             .setType(SchemaType.BYTES)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index c560f0e..0ff308f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -34,7 +34,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("ByteBuffer")
             .setType(SchemaType.BYTES)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 4e4c27e..6d51687 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT8")
             .setType(SchemaType.INT8)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 9c7ec37..98a0e66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -31,7 +31,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("Bytes")
             .setType(SchemaType.BYTES)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
index 295dae6..cbdb912 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -33,7 +33,7 @@
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Date")
              .setType(SchemaType.DATE)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index baa1aac..4b269a6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("Double")
             .setType(SchemaType.DOUBLE)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index aed905b..84d4073 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
                 .setName("Float")
                 .setType(SchemaType.FLOAT)
                 .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
index 5830cea..db33de7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -33,7 +33,7 @@
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Instant")
              .setType(SchemaType.INSTANT)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index fc8338e..dfad280 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT32")
             .setType(SchemaType.INT32)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4e3b874..9fe6aed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -74,11 +74,11 @@
             ObjectMapper objectMapper = new ObjectMapper();
             JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper);
             JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
-            backwardsCompatibleSchemaInfo = new SchemaInfo();
-            backwardsCompatibleSchemaInfo.setName("");
-            backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties());
-            backwardsCompatibleSchemaInfo.setType(SchemaType.JSON);
-            backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
+            backwardsCompatibleSchemaInfo = new SchemaInfoImpl()
+                    .setName("")
+                    .setProperties(schemaInfo.getProperties())
+                    .setType(SchemaType.JSON)
+                    .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
         } catch (JsonProcessingException ex) {
             throw new RuntimeException(ex);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
index add6fd2..18ef3af 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -32,7 +32,7 @@
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("LocalDate")
              .setType(SchemaType.LOCAL_DATE)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index aa86a19..05b2787 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -37,7 +37,7 @@
    public static final String DELIMITER = ":";
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("LocalDateTime")
              .setType(SchemaType.LOCAL_DATE_TIME)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
index 6e2bf62..e53c620 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -32,7 +32,7 @@
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("LocalTime")
              .setType(SchemaType.LOCAL_TIME)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index f1491f4..deccaf4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT64")
             .setType(SchemaType.INT64)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 385fc41..9cf753c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -72,11 +72,10 @@
         setReader(new ProtobufNativeReader<>(protoMessageInstance));
         setWriter(new ProtobufNativeWriter<>());
         // update properties with protobuf related properties
-        Map<String, String> allProperties = new HashMap<>();
-        allProperties.putAll(schemaInfo.getProperties());
         // set protobuf parsing info
+        Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties());
         allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
-        schemaInfo.setProperties(allProperties);
+        ((SchemaInfoImpl)schemaInfo).setProperties(allProperties);
     }
 
     private String getParsingInfo(T protoMessageInstance) {
@@ -124,7 +123,7 @@
         }
         Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo());
 
-        SchemaInfo schemaInfo = SchemaInfo.builder()
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                 .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
                 .type(SchemaType.PROTOBUF_NATIVE)
                 .name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index f7971eb..275cacd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -69,11 +69,10 @@
         setReader(new ProtobufReader<>(protoMessageInstance));
         setWriter(new ProtobufWriter<>());
         // update properties with protobuf related properties
-        Map<String, String> allProperties = new HashMap<>();
-        allProperties.putAll(schemaInfo.getProperties());
         // set protobuf parsing info
+        Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties());
         allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
-        schemaInfo.setProperties(allProperties);
+        ((SchemaInfoImpl)schemaInfo).setProperties(allProperties);
     }
 
     private String getParsingInfo(T protoMessageInstance) {
@@ -111,7 +110,7 @@
                     + " is not assignable from " + pojo.getName());
         }
 
-            SchemaInfo schemaInfo = SchemaInfo.builder()
+            SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8))
                     .type(SchemaType.PROTOBUF)
                     .name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
index ee9f0cb..0fda7d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
@@ -105,7 +105,7 @@
         }
 
         baseSchema.setFields(avroFields);
-        return new SchemaInfo(
+        return new SchemaInfoImpl(
             name,
             baseSchema.toString().getBytes(UTF_8),
             schemaType,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
new file mode 100644
index 0000000..fb5263e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.experimental.UtilityClass;
+
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.Schema;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Class helping to initialize schemas.
+ */
+@UtilityClass
+public class SchemaInfoUtil {
+
+    public static SchemaInfo newSchemaInfo(String name, SchemaData data) {
+        return SchemaInfoImpl.builder()
+                .name(name)
+                .schema(data.getData())
+                .type(data.getType())
+                .properties(data.getProps())
+                .build();
+    }
+
+    public static SchemaInfo newSchemaInfo(Schema schema) {
+        SchemaInfoImpl.SchemaInfoImplBuilder si = SchemaInfoImpl.builder()
+                .name(schema.getName())
+                .schema(schema.getSchemaData())
+                .type(Commands.getSchemaType(schema.getType()));
+        if (schema.getPropertiesCount() == 0) {
+            si.properties(Collections.emptyMap());
+        } else {
+            Map<String, String> properties = new TreeMap<>();
+            for (int i = 0; i < schema.getPropertiesCount(); i++) {
+                KeyValue kv = schema.getPropertyAt(i);
+                properties.put(kv.getKey(), kv.getValue());
+            }
+
+            si.properties(properties);
+        }
+        return si.build();
+    }
+
+    public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) {
+        return SchemaInfoImpl.builder()
+                .name(name)
+                .schema(schema.getData().getBytes(StandardCharsets.UTF_8))
+                .type(schema.getType())
+                .properties(schema.getProperties())
+                .build();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index 4014405..bbb5ad6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -32,7 +32,7 @@
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT16")
             .setType(SchemaType.INT16)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 7e57f6c..462fa60 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -46,7 +46,7 @@
         // Ensure the ordering of the static initialization
         CHARSET_KEY = "__charset";
         DEFAULT_CHARSET = StandardCharsets.UTF_8;
-        DEFAULT_SCHEMA_INFO = new SchemaInfo()
+        DEFAULT_SCHEMA_INFO = new SchemaInfoImpl()
                 .setName("String")
                 .setType(SchemaType.STRING)
                 .setSchema(new byte[0]);
@@ -87,7 +87,7 @@
         this.charset = charset;
         Map<String, String> properties = new HashMap<>();
         properties.put(CHARSET_KEY, charset.name());
-        this.schemaInfo = new SchemaInfo()
+        this.schemaInfo = new SchemaInfoImpl()
                 .setName(DEFAULT_SCHEMA_INFO.getName())
                 .setType(SchemaType.STRING)
                 .setSchema(DEFAULT_SCHEMA_INFO.getSchema())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 8cc1868..7ba116f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -105,7 +105,7 @@
     }
 
     public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
                 .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
                 .properties(schemaDefinition.getProperties())
                 .name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
index d56e4da..ab6e1ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -33,7 +33,7 @@
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Time")
              .setType(SchemaType.TIME)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
index 899e159..755b466 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -33,7 +33,7 @@
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Timestamp")
              .setType(SchemaType.TIMESTAMP)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
index 7e1e1c0..70d7fc0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
@@ -23,6 +23,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -47,7 +48,7 @@
     }
 
     public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
                 .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
                 .properties(schemaDefinition.getProperties())
                 .name("")
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml
index bf01926..a37c886 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -69,6 +69,22 @@
         <Class name="org.apache.pulsar.client.impl.schema.BooleanSchema"/>
         <Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
     </Match>
+    
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+
+    <Match>
+      <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl$SchemaInfoImplBuilder"/>
+      <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+    
 
     <Match>
         <Class name="~org.apache.pulsar.client.impl.ConsumerImpl.*"/>
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
index 994f013..9474b80 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -171,7 +171,7 @@
             KeyValueEncodingType.SEPARATED
         );
 
-        SchemaInfo oldSchemaInfo = new SchemaInfo()
+        SchemaInfo oldSchemaInfo = new SchemaInfoImpl()
             .setName("")
             .setType(SchemaType.KEY_VALUE)
             .setSchema(kvSchema.getSchemaInfo().getSchema())
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index b0a86c2..bd94bf0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -30,6 +30,7 @@
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -67,28 +68,32 @@
 
     @Test
     public void testFillParametersToSchemainfo() {
-        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
-
-        fooSchema.getSchemaInfo().setName("foo");
-        fooSchema.getSchemaInfo().setType(SchemaType.AVRO);
         Map<String, String> keyProperties = Maps.newTreeMap();
         keyProperties.put("foo.key1", "value");
         keyProperties.put("foo.key2", "value");
-        fooSchema.getSchemaInfo().setProperties(keyProperties);
-        barSchema.getSchemaInfo().setName("bar");
-        barSchema.getSchemaInfo().setType(SchemaType.AVRO);
+
         Map<String, String> valueProperties = Maps.newTreeMap();
         valueProperties.put("bar.key", "key");
-        barSchema.getSchemaInfo().setProperties(valueProperties);
+
+        AvroSchema<Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<Foo>builder()
+                        .withPojo(Foo.class)
+                        .withProperties(keyProperties)
+                        .build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<Bar>builder()
+                        .withPojo(Bar.class)
+                        .withProperties(valueProperties)
+                        .build());
+
         Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
 
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo");
         assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO));
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"),
+                "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
         assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO));
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"),
+                "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"bar.key\":\"key\"}");
     }
 
     @Test
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
index 7e34a64..f96e84e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -289,7 +289,7 @@
 
         @Test
         public void testUnsetProperties() {
-            final SchemaInfo schemaInfo = SchemaInfo.builder()
+            final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .type(SchemaType.STRING)
                     .schema(new byte[0])
                     .name("string")
@@ -305,7 +305,7 @@
         public void testSetProperties() {
             final Map<String, String> map = Maps.newHashMap();
             map.put("test", "value");
-            final SchemaInfo schemaInfo = SchemaInfo.builder()
+            final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .type(SchemaType.STRING)
                     .schema(new byte[0])
                     .name("string")
@@ -322,10 +322,16 @@
         public void testNullPropertyValue() {
             final Map<String, String> map = new HashMap<>();
             map.put("key", null);
-            final IntSchema schema = new IntSchema();
-            schema.getSchemaInfo().setProperties(map);
+
+            SchemaInfo si = SchemaInfoImpl.builder()
+                    .name("INT32")
+                    .schema(new byte[0])
+                    .type(SchemaType.INT32)
+                    .properties(map)
+                    .build();
+
             // null key will be skipped by Gson when serializing JSON to String
-            assertEquals(schema.getSchemaInfo().toString(), INT32_SCHEMA_INFO);
+            assertEquals(si.toString(), INT32_SCHEMA_INFO);
         }
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
index b09bf4d..b250322 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
@@ -87,7 +87,7 @@
 
     @Test
     public void testSchemaInfoWithoutCharset() {
-        SchemaInfo si = new SchemaInfo()
+        SchemaInfo si = new SchemaInfoImpl()
             .setName("test-schema-info-without-charset")
             .setType(SchemaType.STRING)
             .setSchema(new byte[0])
@@ -122,7 +122,7 @@
     public void testSchemaInfoWithCharset(Charset charset) {
         Map<String, String> properties = new HashMap<>();
         properties.put(StringSchema.CHARSET_KEY, charset.name());
-        SchemaInfo si = new SchemaInfo()
+        SchemaInfo si = new SchemaInfoImpl()
             .setName("test-schema-info-without-charset")
             .setType(SchemaType.STRING)
             .setSchema(new byte[0])
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index fd29ce7..05028a0 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -183,6 +183,10 @@
       <version>1.1.7.5</version>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>com.google.code.gson</groupId>
+          <artifactId>gson</artifactId>
+      </dependency>
 
   </dependencies>
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
index 0f0429e..c58fef3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
@@ -25,7 +25,6 @@
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Value;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AuthPolicies;
 
@@ -47,15 +46,17 @@
         return new AuthPoliciesImplBuilder();
     }
 
-    private static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
+
+    public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
         private Map<String, Set<AuthAction>> namespaceAuthentication = new TreeMap<>();
-        private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();
-        private Map<String, Set<String>> subscriptionAuthentication = new TreeMap<>();
+        private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();;
+        private Map<String, Set<String>> subscriptionAuthentication= new TreeMap<>();;
 
         AuthPoliciesImplBuilder() {
         }
 
-        public AuthPoliciesImplBuilder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication) {
+        public AuthPoliciesImplBuilder namespaceAuthentication(
+                Map<String, Set<AuthAction>> namespaceAuthentication) {
             this.namespaceAuthentication = namespaceAuthentication;
             return this;
         }
@@ -66,7 +67,8 @@
             return this;
         }
 
-        public AuthPoliciesImplBuilder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication) {
+        public AuthPoliciesImplBuilder subscriptionAuthentication(
+                Map<String, Set<String>> subscriptionAuthentication) {
             this.subscriptionAuthentication = subscriptionAuthentication;
             return this;
         }
@@ -74,5 +76,11 @@
         public AuthPoliciesImpl build() {
             return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication);
         }
+
+        public String toString() {
+            return "AuthPoliciesImpl.AuthPoliciesImplBuilder(namespaceAuthentication=" + this.namespaceAuthentication
+                    + ", topicAuthentication=" + this.topicAuthentication + ", subscriptionAuthentication="
+                    + this.subscriptionAuthentication + ")";
+        }
     }
 }
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
similarity index 96%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 9573526..5f36909 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -46,7 +46,7 @@
 
         @Override
         public SchemaInfo getSchemaInfo() {
-            return BytesSchema.BYTES.getSchemaInfo();
+            return Schema.BYTES.getSchemaInfo();
         }
 
         @Override
@@ -169,11 +169,12 @@
         properties.put(KV_ENCODING_TYPE, String.valueOf(keyValueEncodingType));
 
         // generate the final schema info
-        return new SchemaInfo()
-            .setName(schemaName)
-            .setType(SchemaType.KEY_VALUE)
-            .setSchema(schemaData)
-            .setProperties(properties);
+        return SchemaInfoImpl.builder()
+                .name(schemaName)
+                .type(SchemaType.KEY_VALUE)
+                .schema(schemaData)
+                .properties(properties)
+                .build();
     }
 
     private static void encodeSubSchemaInfoToParentSchemaProperties(SchemaInfo schemaInfo,
@@ -237,7 +238,7 @@
         } else {
             schemaProps = SchemaUtils.deserializeSchemaProperties(schemaPropsStr);
         }
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
             .name(schemaName)
             .type(schemaType)
             .schema(schemaData)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
new file mode 100644
index 0000000..ca8b6cc
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * Information about the schema.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
+@Builder
+public class SchemaInfoImpl implements SchemaInfo {
+
+    @EqualsAndHashCode.Exclude
+    private String name;
+
+    /**
+     * The schema data in AVRO JSON format.
+     */
+    private byte[] schema;
+
+    /**
+     * The type of schema (AVRO, JSON, PROTOBUF, etc..).
+     */
+    private SchemaType type;
+
+    /**
+     * Additional properties of the schema definition (implementation defined).
+     */
+    @Builder.Default
+    private Map<String, String> properties = Collections.emptyMap();
+
+    public String getSchemaDefinition() {
+        if (null == schema) {
+            return "";
+        }
+
+        switch (type) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF:
+            case PROTOBUF_NATIVE:
+                return new String(schema, UTF_8);
+            case KEY_VALUE:
+                KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(this);
+                return SchemaUtils.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
+            default:
+                return Base64.getEncoder().encodeToString(schema);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return SchemaUtils.jsonifySchemaInfo(this);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
similarity index 100%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index 5c00f06..d5b4405 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -45,7 +46,7 @@
      * @return the converted schema info.
      */
     public SchemaInfo toSchemaInfo() {
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
             .name("")
             .type(type)
             .schema(data)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
deleted file mode 100644
index ac5997d..0000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.protocol.schema;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.TreeMap;
-import lombok.experimental.UtilityClass;
-
-import org.apache.pulsar.common.api.proto.KeyValue;
-import org.apache.pulsar.common.api.proto.Schema;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.schema.SchemaInfo;
-
-/**
- * Class helping to initialize schemas.
- */
-@UtilityClass
-public class SchemaInfoUtil {
-
-    public static SchemaInfo newSchemaInfo(String name, SchemaData data) {
-        SchemaInfo si = new SchemaInfo();
-        si.setName(name);
-        si.setSchema(data.getData());
-        si.setType(data.getType());
-        si.setProperties(data.getProps());
-        return si;
-    }
-
-    public static SchemaInfo newSchemaInfo(Schema schema) {
-        SchemaInfo si = new SchemaInfo();
-        si.setName(schema.getName());
-        si.setSchema(schema.getSchemaData());
-        si.setType(Commands.getSchemaType(schema.getType()));
-        if (schema.getPropertiesCount() == 0) {
-            si.setProperties(Collections.emptyMap());
-        } else {
-            si.setProperties(new TreeMap<>());
-            for (int i = 0; i < schema.getPropertiesCount(); i++) {
-                KeyValue kv = schema.getPropertyAt(i);
-                si.getProperties().put(kv.getKey(), kv.getValue());
-            }
-        }
-        return si;
-    }
-
-    public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) {
-        SchemaInfo si = new SchemaInfo();
-        si.setName(name);
-        si.setSchema(schema.getData().getBytes(StandardCharsets.UTF_8));
-        si.setType(schema.getType());
-        si.setProperties(schema.getProperties());
-        return si;
-    }
-}
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 169985b..bf700783 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -30,6 +30,19 @@
     <relativePath>..</relativePath>
   </parent>
 
+  <!--
+
+    THIS MODULE SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE.
+    THIS MODULE SHOULD ONLY DEPEND ON:
+    1. pulsar-io-core
+    2. pulsar-functions-api
+    3. pulsar-client-api
+    4. slf4j-api
+    5. log4j-slf4j-impl
+    6. log4j-api
+    7. log4j-core
+  -->
+
   <artifactId>pulsar-functions-runtime-all</artifactId>
   <name>Pulsar Functions :: Runtime All</name>
 
@@ -48,7 +61,7 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-original</artifactId>
+      <artifactId>pulsar-client-api</artifactId>
       <version>${project.version}</version>
     </dependency>
 
@@ -77,6 +90,19 @@
 
   <build>
     <plugins>
+      <!--
+      Disable the maven-jar-plugin since we don't need the default jar and it conflicts with the maven-assembly-plugin
+      -->
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-jar</id>
+            <phase/>
+          </execution>
+        </executions>
+      </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-assembly-plugin</artifactId>
@@ -92,7 +118,7 @@
           <execution>
             <id>make-assembly</id>
             <!-- bind to the packaging phase -->
-            <phase>package</phase>
+            <phase>compile</phase>
             <goals>
               <goal>single</goal>
             </goals>
diff --git a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
index bd64bf7..6852792 100644
--- a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
+++ b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
@@ -36,6 +36,7 @@
  * This class will create three classloaders:
  *      1. The root classloader that will share interfaces between the function instance
  *      classloader and user code classloader. This classloader will contain the following dependencies
+ *          - pulsar-io-core
  *          - pulsar-functions-api
  *          - pulsar-client-api
  *          - log4j-slf4j-impl
diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
new file mode 100644
index 0000000..3bdd23f
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+@Slf4j
+/**
+ * This test serves to make sure that the correct classes are included in the java-instance.jar
+ * THAT JAR SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE
+ * WHICH INCLUDES CLASSES FROM THE FOLLOWING LIBRARIES
+ *     1. pulsar-io-core
+ *     2. pulsar-functions-api
+ *     3. pulsar-client-api
+ *     4. slf4j-api
+ *     5. log4j-slf4j-impl
+ *     6. log4j-api
+ *     7. log4j-core
+ */
+public class JavaInstanceDepsTest {
+
+    @Test
+    public void testInstanceJarDeps() throws IOException {
+        File jar = new File("target/java-instance.jar");
+        
+        @Cleanup
+        ZipInputStream zip = new ZipInputStream(jar.toURI().toURL().openStream());
+
+        List<String> notAllowedClasses = new LinkedList<>();
+        while(true) {
+            ZipEntry e = zip.getNextEntry();
+            if (e == null)
+                break;
+            String name = e.getName();
+            if (name.endsWith(".class") && !name.startsWith("META-INF")) {
+                // The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes
+                // filter out those classes to see if there are any other classes that should not be allowed
+                if (!name.startsWith("org/apache/pulsar")
+                        && !name.startsWith("org/slf4j")
+                        && !name.startsWith("org/apache/logging/slf4j")
+                        && !name.startsWith("org/apache/logging/log4j")) {
+                    notAllowedClasses.add(name);
+                }
+            }
+        }
+
+        Assert.assertEquals(notAllowedClasses, Collections.emptyList(), notAllowedClasses.toString());
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index e0a9c64..474410c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -179,7 +179,6 @@
                 String.format("%s-%s",
                         FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
                         instanceConfig.getInstanceId()));
-        this.fnThread.setContextClassLoader(functionClassLoader);
         this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
index 2db9d6c..ba57692 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -44,7 +45,7 @@
         Map<String, String> props = new HashMap<>();
         boolean isJsonConverter = converter instanceof JsonConverter;
         props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
-        this.schemaInfo = SchemaInfo.builder()
+        this.schemaInfo = SchemaInfoImpl.builder()
                 .name(isJsonConverter? "KafKaJson" : "KafkaAvro")
                 .type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO)
                 .schema(schema.toString().getBytes(UTF_8))
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
index eda8c96..2a9e1c4 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -25,6 +25,7 @@
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -65,7 +66,7 @@
             org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
             String definition = schema.toString(false);
             log.info("Schema {} definition {}", schemaId, definition);
-            SchemaInfo schemaInfo = SchemaInfo.builder()
+            SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .type(SchemaType.AVRO)
                     .name(schema.getName())
                     .properties(Collections.emptyMap())
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 81bf031..47bedbe 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -42,6 +42,7 @@
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -235,7 +236,7 @@
 
      static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
         DeferredSchemaPlaceholder() {
-            super(SchemaInfo
+            super(SchemaInfoImpl
                     .builder()
                     .type(SchemaType.AVRO)
                     .properties(Collections.emptyMap())
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index b155e45..3f0bc09 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -450,7 +450,6 @@
   * Apache Zookeeper
     - zookeeper-3.6.3.jar
     - zookeeper-jute-3.6.3.jar
-    - zookeeper-prometheus-metrics-3.6.3.jar
   * Apache Yetus Audience Annotations
     - audience-annotations-0.5.0.jar
   * Swagger
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 03a6b77..645edbd 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -101,7 +102,7 @@
         this.offloadPolicies = offloadPolicies;
 
         ObjectMapper objectMapper = new ObjectMapper();
-        this.schemaInfo = SchemaInfo.builder()
+        this.schemaInfo = SchemaInfoImpl.builder()
                 .name(originSchemaName)
                 .type(schemaType)
                 .schema(schema.getBytes("ISO8859-1"))
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index cd31b4b..79fb789 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -23,6 +23,7 @@
 import io.prestosql.spi.connector.*;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -188,9 +189,10 @@
     @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
     public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        SchemaInfo badSchemaInfo = new SchemaInfo();
-        badSchemaInfo.setSchema(new byte[0]);
-        badSchemaInfo.setType(SchemaType.AVRO);
+        SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+                .schema(new byte[0])
+                .type(SchemaType.AVRO)
+                .build();
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
 
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
@@ -214,9 +216,10 @@
     @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
     public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        SchemaInfo badSchemaInfo = new SchemaInfo();
-        badSchemaInfo.setSchema("foo".getBytes());
-        badSchemaInfo.setType(SchemaType.AVRO);
+        SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+                .schema("foo".getBytes())
+                .type(SchemaType.AVRO)
+                .build();
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
 
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
index f4810ba..c1b97d3 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
@@ -22,6 +22,7 @@
 import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.sql.presto.PulsarColumnHandle;
@@ -65,7 +66,7 @@
     public void testPrimitiveType() {
 
         byte int8Value = 1;
-        SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
+        SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build();
         Schema schemaInt8 = Schema.getSchema(schemaInfoInt8);
         List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
         PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8,
@@ -77,7 +78,7 @@
                 PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value);
 
         short int16Value = 2;
-        SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
+        SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build();
         Schema schemaInt16 = Schema.getSchema(schemaInfoInt16);
         List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
         PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16,
@@ -89,7 +90,7 @@
                 PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value);
 
         int int32Value = 2;
-        SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
+        SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build();
         Schema schemaInt32 = Schema.getSchema(schemaInfoInt32);
         List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -102,7 +103,7 @@
                 PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value);
 
         long int64Value = 2;
-        SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
+        SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build();
         Schema schemaInt64 = Schema.getSchema(schemaInfoInt64);
         List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -116,7 +117,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), int64Value);
 
         String stringValue = "test";
-        SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
+        SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build();
         Schema schemaString = Schema.getSchema(schemaInfoString);
         List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -130,7 +131,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), stringValue);
 
         float floatValue = 0.2f;
-        SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
+        SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build();
         Schema schemaFloat = Schema.getSchema(schemaInfoFloat);
         List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -144,7 +145,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue)));
 
         double doubleValue = 0.22d;
-        SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
+        SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build();
         Schema schemaDouble = Schema.getSchema(schemaInfoDouble);
         List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -158,7 +159,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue);
 
         boolean booleanValue = true;
-        SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
+        SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build();
         Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean);
         List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -173,7 +174,7 @@
 
         byte[] bytesValue = new byte[1];
         bytesValue[0] = 1;
-        SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
+        SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build();
         Schema schemaBytes = Schema.getSchema(schemaInfoBytes);
         List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -187,7 +188,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue));
 
         Date dateValue = new Date(System.currentTimeMillis());
-        SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
+        SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build();
         Schema schemaDate = Schema.getSchema(schemaInfoDate);
         List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -201,7 +202,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime());
 
         Time timeValue = new Time(System.currentTimeMillis());
-        SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
+        SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build();
         Schema schemaTime = Schema.getSchema(schemaInfoTime);
         List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -215,7 +216,7 @@
                 PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime());
 
         Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
-        SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
+        SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build();
         Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp);
         List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index bb5190e..140168d 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -36,7 +36,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.pulsar</groupId>
-            <artifactId>pulsar-client-original</artifactId>
+            <artifactId>pulsar-client</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
@@ -82,24 +82,10 @@
                             </transformers>
                             <artifactSet>
                                 <includes>
-                                    <include>org.apache.pulsar:pulsar-client-original</include>
-                                    <include>org.apache.pulsar:pulsar-client-api</include>
-                                    <include>org.apache.pulsar:pulsar-client-admin-api</include>
+                                    <include>org.apache.pulsar:pulsar-client</include>
                                     <include>org.apache.pulsar:pulsar-functions-api-examples</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>org.apache.pulsar:pulsar-client-original</artifact>
-                                    <includes>
-                                        <include>**</include>
-                                    </includes>
-                                    <excludes>
-                                        <!-- bouncycastle jars could not be shaded, or the signatures will be wrong-->
-                                        <exclude>org/bouncycastle/**</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
index e0cc328..3c961fa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -59,42 +60,45 @@
         String outputTopicName = "test-state-source-output-" + randomName(8);
         String sourceName = "test-state-source-" + randomName(8);
         int numMessages = 10;
+        try {
+            submitSourceConnector(
+                    sourceName,
+                    outputTopicName,
+                    "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR);
 
-        submitSourceConnector(
-            sourceName,
-            outputTopicName,
-            "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR);
+            // get source info
+            getSourceInfoSuccess(container, sourceName);
 
-        // get source info
-        getSourceInfoSuccess(container, sourceName);
+            // get source status
+            getSourceStatus(container, sourceName);
 
-        // get source status
-        getSourceStatus(container, sourceName);
+            try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
 
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+                retryStrategically((test) -> {
+                    try {
+                        SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                        return status.getInstances().size() > 0
+                                && status.getInstances().get(0).getStatus().numWritten >= 10;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 10, 200);
 
-            retryStrategically((test) -> {
-                try {
-                    SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
-                    return status.getInstances().size() > 0
-                        && status.getInstances().get(0).getStatus().numWritten >= 10;
-                } catch (PulsarAdminException e) {
-                    return false;
-                }
-            }, 10, 200);
+                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                assertEquals(status.getInstances().size(), 1);
+                assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
+            }
 
-            SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
-            assertEquals(status.getInstances().size(), 1);
-            assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
+            consumeMessages(container, outputTopicName, numMessages);
+
+            // delete source
+            deleteSource(container, sourceName);
+
+            getSourceInfoNotFound(container, sourceName);
+        } finally {
+            dumpFunctionLogs(sourceName);
         }
 
-        consumeMessages(container, outputTopicName, numMessages);
-
-        // delete source
-        deleteSource(container, sourceName);
-
-        getSourceInfoNotFound(container, sourceName);
-
     }
 
     private void submitSourceConnector(String sourceName,
@@ -129,15 +133,35 @@
     }
 
     private static void getSourceStatus(StandaloneContainer container,String sourceName) throws Exception {
+        retryStrategically((test) -> {
+                    try {
+                        ContainerExecResult result = container.execCmd(
+                                PulsarCluster.ADMIN_SCRIPT,
+                                "sources",
+                                "status",
+                                "--tenant", "public",
+                                "--namespace", "default",
+                                "--name", sourceName);
+
+                        if (result.getStdout().contains("\"running\" : true")) {
+                            return true;
+                        }
+                        return false;
+                    } catch (Exception e) {
+                        log.error("Encountered error when getting source status", e);
+                        return false;
+                    }
+                }, 10, 200);
+
         ContainerExecResult result = container.execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "sources",
-            "status",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", sourceName
-        );
-        assertTrue(result.getStdout().contains("\"running\" : true"));
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName);
+
+        Assert.assertTrue(result.getStdout().contains("\"running\" : true"));
     }
 
     private static void consumeMessages(StandaloneContainer container, String outputTopic,