blob: 21dae95d852533f811ebefedcff983a08959eec9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.Future;
/**
* Kafka specific access point to the Atlas notification framework.
*/
@Component
@Order(3)
public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = "atlas.kafka";
public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}
};
private final Properties properties;
private final Long pollTimeOutMs;
private KafkaConsumer consumer;
private KafkaProducer producer;
private String consumerClosedErrorMsg;
// ----- Constructors ----------------------------------------------------
/**
* Construct a KafkaNotification.
*
* @param applicationProperties the application properties used to configure Kafka
*
* @throws AtlasException if the notification interface can not be created
*/
@Inject
public KafkaNotification(Configuration applicationProperties) throws AtlasException {
super(applicationProperties);
LOG.info("==> KafkaNotification()");
Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
properties = ConfigurationConverter.getProperties(kafkaConf);
pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
consumerClosedErrorMsg = kafkaConf.getString("error.message.consumer_closed", DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE);
//Override default configs
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
boolean oldApiCommitEnableFlag = kafkaConf.getBoolean("auto.commit.enable", false);
//set old autocommit value if new autoCommit property is not set.
properties.put("enable.auto.commit", kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag));
properties.put("session.timeout.ms", kafkaConf.getString("session.timeout.ms", "30000"));
// if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
LOG.info("<== KafkaNotification()");
}
@VisibleForTesting
protected KafkaNotification(Properties properties) {
super();
LOG.info("==> KafkaNotification()");
this.properties = properties;
this.pollTimeOutMs = 1000L;
LOG.info("<== KafkaNotification()");
}
@VisibleForTesting
String getTopicName(NotificationType notificationType) {
return TOPIC_MAP.get(notificationType);
}
// ----- Service ---------------------------------------------------------
@Override
public void start() throws AtlasException {
LOG.info("==> KafkaNotification.start()");
LOG.info("<== KafkaNotification.start()");
}
@Override
public void stop() {
LOG.info("==> KafkaNotification.stop()");
LOG.info("<== KafkaNotification.stop()");
}
// ----- NotificationInterface -------------------------------------------
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
return createConsumers(notificationType, numConsumers, Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
}
@VisibleForTesting
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) {
LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
Properties consumerProperties = getConsumerProperties(notificationType);
List<NotificationConsumer<T>> consumers = new ArrayList<>();
AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs );
consumers.add(kafkaConsumer);
LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
return consumers;
}
@Override
public void close() {
LOG.info("==> KafkaNotification.close()");
if (producer != null) {
producer.close();
producer = null;
}
LOG.info("<== KafkaNotification.close()");
}
// ----- AbstractNotification --------------------------------------------
@Override
public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
if (producer == null) {
createProducer();
}
sendInternalToProducer(producer, type, messages);
}
@VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException {
String topic = TOPIC_MAP.get(type);
List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) {
ProducerRecord record = new ProducerRecord(topic, message);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message for topic {}: {}", topic, message);
}
Future future = p.send(record);
messageContexts.add(new MessageContext(future, message));
}
List<String> failedMessages = new ArrayList<>();
Exception lastFailureException = null;
for (MessageContext context : messageContexts) {
try {
RecordMetadata response = context.getFuture().get();
if (LOG.isDebugEnabled()) {
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), response.partition(), response.offset());
}
} catch (Exception e) {
lastFailureException = e;
failedMessages.add(context.getMessage());
}
}
if (lastFailureException != null) {
throw new NotificationException(lastFailureException, failedMessages);
}
}
public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
if (consumer == null || !isKafkaConsumerOpen(consumer)) {
try {
String topic = TOPIC_MAP.get(type);
consumerProperties.put("enable.auto.commit", autoCommitEnabled);
this.consumer = new KafkaConsumer(consumerProperties);
this.consumer.subscribe(Arrays.asList(topic));
} catch (Exception ee) {
LOG.error("Exception in getKafkaConsumer ", ee);
}
}
return this.consumer;
}
@VisibleForTesting
// Get properties for consumer request
public Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
if (StringUtils.isEmpty(groupId)) {
throw new IllegalStateException("No configuration group id set for the notification type " + type);
}
Properties consumerProperties = new Properties();
consumerProperties.putAll(properties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return consumerProperties;
}
private synchronized void createProducer() {
LOG.info("==> KafkaNotification.createProducer()");
if (producer == null) {
producer = new KafkaProducer(properties);
}
LOG.info("<== KafkaNotification.createProducer()");
}
private class MessageContext {
private final Future<RecordMetadata> future;
private final String message;
public MessageContext(Future<RecordMetadata> future, String message) {
this.future = future;
this.message = message;
}
public Future<RecordMetadata> getFuture() {
return future;
}
public String getMessage() {
return message;
}
}
// kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception
private boolean isKafkaConsumerOpen(KafkaConsumer consumer) {
boolean ret = true;
try {
consumer.listTopics();
} catch (IllegalStateException ex) {
if (ex.getMessage().equalsIgnoreCase(consumerClosedErrorMsg)) {
ret = false;
}
}
return ret;
}
}