blob: 7e9f7897d1a265aa40b2b79f8d4de541b2120a61 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.ApplicationProperties;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class KafkaUtils implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
private static final String JAAS_PRINCIPAL_PROP = "principal";
private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
private static final String IMPORT_INTERNAL_TOPICS = "atlas.hook.kafka.import.internal.topics";
public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
final protected Properties kafkaConfiguration;
final protected AdminClient adminClient;
final protected boolean importInternalTopics;
public KafkaUtils(Configuration atlasConfiguration) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils() ");
}
this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties(atlasConfiguration, ATLAS_KAFKA_PROPERTY_PREFIX);
setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
adminClient = AdminClient.create(this.kafkaConfiguration);
importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils() ");
}
}
public void createTopics(List<String> topicNames, int numPartitions, int replicationFactor)
throws TopicExistsException, ExecutionException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> createTopics() ");
}
List<NewTopic> newTopicList = topicNames.stream()
.map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor))
.collect(Collectors.toList());
CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();
for(Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
String topicName = futureEntry.getKey();
KafkaFuture<Void> future = futureEntry.getValue();
future.get();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== createTopics() ");
}
}
public List<String> listAllTopics() throws ExecutionException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.listAllTopics() ");
}
ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics));
List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.listAllTopics() ");
}
return topicNameList;
}
public Integer getPartitionCount(String topicName) throws ExecutionException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.getPartitionCount({})", topicName);
}
Integer partitionCount = null;
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values();
for(Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) {
KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue();
TopicDescription topicDescription = topicDescriptionFuture.get();
List<TopicPartitionInfo> partitionList = topicDescription.partitions();
partitionCount = partitionList.size();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, partitionCount);
}
return partitionCount;
}
public void close() {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.close()");
}
if(adminClient != null) {
adminClient.close();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.close()");
}
}
public static void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
}
if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
LOG.debug("JAAS config is already set, returning");
return;
}
Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
// JAAS Configuration is present then update set those properties in sasl.jaas.config
if(jaasConfig != null && !jaasConfig.isEmpty()) {
String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
LOG.debug("Checking if ticketBased-KafkaClient is set");
// if ticketBased-KafkaClient property is not specified then use the default client name
String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix);
if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
} else {
LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
}
}
String keyPrefix = jaasClientName + ".";
String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String loginModuleName = jaasConfig.getProperty(keyParam);
if (loginModuleName == null) {
LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
return;
}
keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
String controlFlag = jaasConfig.getProperty(keyParam);
if(StringUtils.isEmpty(controlFlag)) {
String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
}
String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
int optionPrefixLen = optionPrefix.length();
StringBuffer optionStringBuffer = new StringBuffer();
for (String key : jaasConfig.stringPropertyNames()) {
if (key.startsWith(optionPrefix)) {
String optionVal = jaasConfig.getProperty(key);
if (optionVal != null) {
optionVal = optionVal.trim();
try {
if (key.equalsIgnoreCase(principalOptionKey)) {
optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
}
} catch (IOException e) {
LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
}
optionVal = surroundWithQuotes(optionVal);
optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
}
}
}
String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
}
}
static boolean isLoginKeytabBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginKeytabBased();
} catch (Exception excp) {
LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
}
return ret;
}
public static boolean isLoginTicketBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginTicketBased();
} catch (Exception excp) {
LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
}
return ret;
}
static String surroundWithQuotes(String optionVal) {
if(StringUtils.isEmpty(optionVal)) {
return optionVal;
}
String ret = optionVal;
// For property values which have special chars like "@" or "/", we need to enclose it in
// double quotes, so that Kafka can parse it
// If the property is already enclosed in double quotes, then do nothing.
if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
// If the string as special characters like except _,-
final String SPECIAL_CHAR_LIST = "/!@#%^&*";
if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
ret = String.format("\"%s\"", optionVal);
}
}
return ret;
}
}