blob: c69574103e360fcfb7dfb525bb06043a1fd647c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.util.Properties;
/**
* A class to create Kafka topics used by Atlas components.
*
* Use this class to create a Kafka topic with specific configuration like number of partitions, replicas, etc.
*/
public class AtlasTopicCreator {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class);
public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";
/**
* Create an Atlas topic.
*
* The topic will get created based on following conditions:
* {@link #ATLAS_NOTIFICATION_CREATE_TOPICS_KEY} is set to true.
* The topic does not already exist.
* Note that despite this, there could be multiple topic creation calls that happen in parallel because hooks
* run in a distributed fashion. Exceptions are caught and logged by this method to prevent the startup of
* the hooks from failing.
* @param atlasProperties {@link Configuration} containing properties to be used for creating topics.
* @param topicNames list of topics to create
*/
public void createAtlasTopic(Configuration atlasProperties, String... topicNames) {
if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)) {
if (!handleSecurity(atlasProperties)) {
return;
}
ZkUtils zkUtils = createZkUtils(atlasProperties);
for (String topicName : topicNames) {
try {
LOG.warn("Attempting to create topic {}", topicName);
if (!ifTopicExists(topicName, zkUtils)) {
createTopic(atlasProperties, topicName, zkUtils);
} else {
LOG.warn("Ignoring call to create topic {}, as it already exists.", topicName);
}
} catch (Throwable t) {
LOG.error("Failed while creating topic {}", topicName, t);
}
}
zkUtils.close();
} else {
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
}
}
@VisibleForTesting
protected boolean handleSecurity(Configuration atlasProperties) {
if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hadoopConf);
try {
String serverPrincipal = SecurityUtil.getServerPrincipal(kafkaPrincipal, (String) null);
UserGroupInformation.setConfiguration(hadoopConf);
UserGroupInformation.loginUserFromKeytab(serverPrincipal, kafkaKeyTab);
} catch (IOException e) {
LOG.warn("Could not login as {} from keytab file {}", kafkaPrincipal, kafkaKeyTab, e);
return false;
}
}
return true;
}
@VisibleForTesting
protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
return AdminUtils.topicExists(zkUtils, topicName);
}
@VisibleForTesting
protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
AdminUtils.createTopic(zkUtils, topicName, numPartitions, numReplicas,
new Properties(), RackAwareMode.Enforced$.MODULE$);
LOG.warn("Created topic {} with partitions {} and replicas {}", topicName, numPartitions, numReplicas);
}
@VisibleForTesting
protected ZkUtils createZkUtils(Configuration atlasProperties) {
String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect");
int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(
zkConnect, sessionTimeout, connectionTimeout);
return new ZkUtils(zkClientAndConnection._1(), zkClientAndConnection._2(), false);
}
public static void main(String[] args) throws AtlasException {
Configuration configuration = ApplicationProperties.get();
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
atlasTopicCreator.createAtlasTopic(configuration, args);
}
}