HBASE-25388 Using an extension of MockProducer on testing side (#76)
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
diff --git a/kafka/hbase-kafka-proxy/pom.xml b/kafka/hbase-kafka-proxy/pom.xml
index a4372d4..356838d 100755
--- a/kafka/hbase-kafka-proxy/pom.xml
+++ b/kafka/hbase-kafka-proxy/pom.xml
@@ -161,6 +161,13 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <version>${kafka-clients.version}</version>
+ </dependency>
<!-- General dependencies -->
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
index 7b767ca..e800501 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
@@ -19,35 +19,28 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.test.MockSerializer;
/**
* Mocks Kafka producer for testing
*/
-public class ProducerForTesting implements Producer<byte[], byte[]> {
+public class ProducerForTesting extends MockProducer<byte[], byte[]> {
Map<String, List<HBaseKafkaEvent>> messages = new HashMap<>();
SpecificDatumReader<HBaseKafkaEvent> dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
- public Map<String, List<HBaseKafkaEvent>> getMessages() {
- return messages;
+ public ProducerForTesting() {
+ super(true, new MockSerializer(), new MockSerializer());
}
- @Override
- public void abortTransaction() throws ProducerFencedException {
+ public Map<String, List<HBaseKafkaEvent>> getMessages() {
+ return messages;
}
@Override
@@ -59,79 +52,9 @@
messages.put(producerRecord.topic(), new ArrayList<>());
}
messages.get(producerRecord.topic()).add(event);
- return new Future<RecordMetadata>() {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return false;
- }
-
- @Override
- public RecordMetadata get() {
- return new RecordMetadata(null, 1, 1, 1, (long)1, 1, 1);
- }
-
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) {
- return null;
- }
- };
+ return super.send(producerRecord);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-
- @Override
- public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord,
- Callback callback) {
- return null;
- }
-
- @Override
- public void flush() {
- }
-
- @Override
- public List<PartitionInfo> partitionsFor(String s) {
- return null;
- }
-
- @Override
- public Map<MetricName, ? extends Metric> metrics() {
- return null;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void close(long l, TimeUnit timeUnit) {
- }
-
- @Override
- public void initTransactions() {
- }
-
- @Override
- public void beginTransaction() throws ProducerFencedException {
- }
-
- @Override
- public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) throws ProducerFencedException {
- }
-
- @Override
- public void commitTransaction() throws ProducerFencedException {
- }
}