commit | 2d171957af174e60637d566b9d0db6f39034a746 | [log] [tgz] |
---|---|---|
author | congbo <39078850+congbobo184@users.noreply.github.com> | Fri Mar 06 14:28:30 2020 +0800 |
committer | GitHub <noreply@github.com> | Thu Mar 05 22:28:30 2020 -0800 |
tree | 8fb69f14d6f0675f7863119f8e54733c6d66cdb8 | |
parent | 0a8864a36c97b7dd17feeabd4b112dbac6045843 [diff] |
Independent schema is set for each consumer generated by topic (#6356) ### Motivation Master Issue: #5454 When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic. ### Modification clone schema for each consumer generated by topic. ### Verifying this change Add the schemaTest for it.
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java index 807f482..aef6dd1 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -74,4 +74,9 @@ public SchemaInfo getSchemaInfo() { return Schema.BYTES.getSchemaInfo(); } + + @Override + public Schema<T> clone() { + return this; + } }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java index 807f482..aef6dd1 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -74,4 +74,9 @@ public SchemaInfo getSchemaInfo() { return Schema.BYTES.getSchemaInfo(); } + + @Override + public Schema<T> clone() { + return this; + } }