blob: 06207b946e59496ef7e8e74a7c6869adf25daaa3 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.manager.client.annotation;
import io.streamnative.pulsar.manager.client.PulsarApplicationListener;
import io.streamnative.pulsar.manager.client.config.PulsarBootstrapConfiguration;
import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
import io.streamnative.pulsar.manager.client.config.PulsarConsumerConfigRegister;
import io.streamnative.pulsar.manager.client.consumer.PulsarConsumerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Parse annotation, for example PulsarListener.
*/
public class PulsarListenerPostProcessor implements BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton {
private static final Logger log = LoggerFactory.getLogger(PulsarListenerPostProcessor.class);
private BeanFactory beanFactory;
private final PulsarConsumerConfigRegister pulsarConsumerConfigRegister = new PulsarConsumerConfigRegister();
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(
new ConcurrentHashMap<>(64));
private static final String PREFIX_ID = "spring.pulsar.container#";
private final AtomicInteger counter = new AtomicInteger();
@Override
public void setBeanFactory(BeanFactory beanFactory) {
log.info("Start set bean Factory");
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
PulsarApplicationListener pulsarApplicationListener = this.beanFactory.getBean(
PulsarBootstrapConfiguration.PULSAR_APPLICATION_LISTENER, PulsarApplicationListener.class);
this.pulsarConsumerConfigRegister.setPulsarApplicationListener(pulsarApplicationListener);
}
}
@Override
public void afterSingletonsInstantiated() {
this.pulsarConsumerConfigRegister.afterPropertiesSet();
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
// Parse annotation
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Map<Method, Set<PulsarListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<PulsarListener>>) method -> {
Set<PulsarListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
} else {
// Custom annotation
for (Map.Entry<Method, Set<PulsarListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (PulsarListener listener : entry.getValue()) {
processPulsarListener(listener, method, bean, beanName);
}
}
}
}
return bean;
}
private Set<PulsarListener> findListenerAnnotations(Method method) {
Set<PulsarListener> listeners = new HashSet<>();
PulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(method, PulsarListener.class);
if (ann != null) {
listeners.add(ann);
}
return listeners;
}
private void processPulsarListener(PulsarListener pulsarListener, Method method, Object bean, String beanName) {
ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData();
if (pulsarListener.id().length() > 0) {
consumerConfigurationData.setId(pulsarListener.id());
} else {
consumerConfigurationData.setId(PREFIX_ID + this.counter.getAndIncrement());
}
consumerConfigurationData.setSubscriptionName(pulsarListener.subscriptionName());
consumerConfigurationData.setSubscriptionType(pulsarListener.subscriptionType());
consumerConfigurationData.setAckTimeout(pulsarListener.ackTimeout());
consumerConfigurationData.setAcknowledgmentGroupTime(pulsarListener.acknowledgmentGroupTime());
consumerConfigurationData.setNegativeAckRedeliveryDelay(pulsarListener.negativeAckRedeliveryDelay());
consumerConfigurationData.setReceiverQueueSize(pulsarListener.receiverQueueSize());
consumerConfigurationData.setAcknowledgmentGroupTime(pulsarListener.acknowledgmentGroupTime());
consumerConfigurationData.setTopics(pulsarListener.topics());
consumerConfigurationData.setPriorityLevel(pulsarListener.priorityLevel());
consumerConfigurationData.setTopicsPattern(pulsarListener.topicsPattern());
consumerConfigurationData.setAutoUpdatePartitions(pulsarListener.autoUpdatePartitions());
consumerConfigurationData.setConsumerName(pulsarListener.consumerName());
consumerConfigurationData.setRegexSubscriptionMode(pulsarListener.regexSubscriptionMode());
consumerConfigurationData.setMethod(method);
consumerConfigurationData.setBean(bean);
consumerConfigurationData.setSchema(pulsarListener.schema());
consumerConfigurationData.setSchemaType(pulsarListener.schemaType());
log.info("Initialization configured to {} use configuration {}", beanName, consumerConfigurationData.toString());
this.pulsarConsumerConfigRegister.setConsumerContainer(consumerConfigurationData);
}
public PulsarConsumerContainer getPulsarConsumerContainer(String id) {
return this.pulsarConsumerConfigRegister.getConsumerContainer(id);
}
}