blob: d1caeee4c22d58d188630c78ac73186f678eada9 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.manager.client.consumer;
import io.streamnative.pulsar.manager.client.Client;
import io.streamnative.pulsar.manager.client.PulsarApplicationListener;
import io.streamnative.pulsar.manager.client.annotation.PulsarListener;
import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
import io.streamnative.pulsar.manager.client.config.PulsarConsumerConfigRegister;
import io.streamnative.pulsar.manager.client.utils.ParseAnnotation;
import io.streamnative.pulsar.manager.client.utils.TestMessage;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Pulsar Consumer container test.
*/
public class PulsarConsumerContainerTest {
private final TestMessage testMessage = new TestMessage();
private static final String receiveMessage = "hello-world";
static class Foo {
private String field1;
private String field2;
private int field3;
}
private class PulsarListenerAnnotationByte {
@PulsarListener(topics = "test", subscriptionName = "xxxx")
public void testReceive(Message message) {
Assert.assertEquals(new String(message.getData()), receiveMessage);
}
}
private class PulsarListenerAnnotationAvro {
@PulsarListener(id = "test-container", topics = "test2",
subscriptionName = "xxxx2", schema = Foo.class, schemaType = SchemaType.AVRO)
public void testReceive(Message message) {
Foo foo = (Foo) message.getValue();
Assert.assertEquals(foo.field1, "a");
Assert.assertEquals(foo.field2, "b");
Assert.assertEquals(foo.field3, 4);
}
}
@Test
public void testPulsarConsumerContainer() throws PulsarClientException, InterruptedException {
Client client = mock(Client.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
Consumer consumer = mock(Consumer.class);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
PulsarApplicationListener pulsarApplicationListener = mock(PulsarApplicationListener.class);
PulsarConsumerConfigRegister pulsarConsumerConfigRegister = new PulsarConsumerConfigRegister();
PulsarListenerAnnotationByte pulsarListenerAnnotation = new PulsarListenerAnnotationByte();
when(pulsarApplicationListener.getClient()).thenReturn(client);
pulsarConsumerConfigRegister.setPulsarApplicationListener(pulsarApplicationListener);
ParseAnnotation.parse(pulsarListenerAnnotation);
for (ConsumerConfigurationData consumerConfigurationData : ParseAnnotation.CONSUMER_CONFIGURATION_DATA) {
pulsarConsumerConfigRegister.setConsumerContainer(consumerConfigurationData);
}
when(client.getPulsarClient()).thenReturn(pulsarClient);
when(pulsarClient.newConsumer(Schema.BYTES)).thenReturn(consumerBuilder);
when(consumerBuilder.subscribe()).thenReturn(consumer);
when(consumer.toString()).thenReturn("consumer");
testMessage.setData(receiveMessage.getBytes());
when(consumer.receive()).thenReturn(testMessage);
pulsarConsumerConfigRegister.afterPropertiesSet();
Thread.sleep(10);
pulsarConsumerConfigRegister.stopAllContainers();
verify(consumer, atLeast(1)).receive();
verify(consumer, atLeast(1)).acknowledgeAsync(testMessage);
}
@Test
public void testPulsarConsumerContainerAvro() throws PulsarClientException, InterruptedException {
Client client = mock(Client.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
Consumer consumer = mock(Consumer.class);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
PulsarApplicationListener pulsarApplicationListener = mock(PulsarApplicationListener.class);
PulsarConsumerConfigRegister pulsarConsumerConfigRegister = new PulsarConsumerConfigRegister();
PulsarListenerAnnotationAvro pulsarListenerAnnotation = new PulsarListenerAnnotationAvro();
when(pulsarApplicationListener.getClient()).thenReturn(client);
pulsarConsumerConfigRegister.setPulsarApplicationListener(pulsarApplicationListener);
ParseAnnotation.parse(pulsarListenerAnnotation);
for (ConsumerConfigurationData consumerConfigurationData : ParseAnnotation.CONSUMER_CONFIGURATION_DATA) {
pulsarConsumerConfigRegister.setConsumerContainer(consumerConfigurationData);
}
when(client.getPulsarClient()).thenReturn(pulsarClient);
Schema fooSchema = Schema.AVRO(Foo.class);
PulsarConsumerContainer pulsarConsumerContainer = pulsarConsumerConfigRegister
.getConsumerContainer("test-container");
pulsarConsumerContainer.getPulsarConsumer().setSchema(fooSchema);
when(pulsarClient.newConsumer(fooSchema)).thenReturn(consumerBuilder);
when(consumerBuilder.subscribe()).thenReturn(consumer);
when(consumer.toString()).thenReturn("consumer");
Foo foo = new Foo();
foo.field1 = "a";
foo.field2 = "b";
foo.field3 = 4;
testMessage.setValue(foo);
when(consumer.receive()).thenReturn(testMessage);
pulsarConsumerConfigRegister.afterPropertiesSet();
Thread.sleep(10);
pulsarConsumerConfigRegister.stopAllContainers();
verify(consumer, atLeast(1)).receive();
verify(consumer, atLeast(1)).acknowledgeAsync(testMessage);
}
}