blob: fefb6bfb52d7684fa18549a37b41323e39460868 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.tagsync.process;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecureClientLogin;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.ranger.tagsync.model.TagSink;
import org.apache.ranger.tagsync.model.TagSource;
public class TagSynchronizer {
private static final Logger LOG = Logger.getLogger(TagSynchronizer.class);
private static final String AUTH_TYPE_KERBEROS = "kerberos";
private static final String TAGSYNC_SOURCE_BASE = "ranger.tagsync.source.";
private static final String PROP_CLASS_NAME = "class";
private TagSink tagSink = null;
private List<TagSource> tagSources = new ArrayList<TagSource>();
private List<TagSource> failedTagSources = new ArrayList<TagSource>();
private Properties properties = null;
private final Object shutdownNotifier = new Object();
private volatile boolean isShutdownInProgress = false;
public static void main(String[] args) {
TagSynchronizer tagSynchronizer = new TagSynchronizer();
TagSyncConfig config = TagSyncConfig.getInstance();
Properties props = config.getProperties();
tagSynchronizer.setProperties(props);
boolean tagSynchronizerInitialized = tagSynchronizer.initialize();
if (tagSynchronizerInitialized) {
try {
tagSynchronizer.run();
} catch (Throwable t) {
LOG.error("main thread caught exception..:", t);
System.exit(1);
}
} else {
LOG.error("TagSynchronizer failed to initialize correctly, exiting..");
System.exit(1);
}
}
TagSynchronizer() {
this(null);
}
TagSynchronizer(Properties properties) {
setProperties(properties);
}
void setProperties(Properties properties) {
if (properties == null || MapUtils.isEmpty(properties)) {
this.properties = new Properties();
} else {
this.properties = properties;
}
}
public boolean initialize() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.initialize()");
}
printConfigurationProperties(properties);
boolean ret = initializeKerberosIdentity(properties);
if (ret) {
LOG.info("Initializing TAG source and sink");
ret = false;
tagSink = initializeTagSink(properties);
if (tagSink != null) {
initializeTagSources();
ret = true;
}
} else {
LOG.error("Error initializing kerberos identity");
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagSynchronizer.initialize() : " + ret);
}
return ret;
}
public void run() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.run()");
}
isShutdownInProgress = false;
try {
boolean threadsStarted = tagSink.start();
if (LOG.isDebugEnabled()) {
LOG.debug("==> starting TagSyncMetricsProducer with default metrics location : "+System.getProperty("logdir"));
}
//Start the tag sync metrics
boolean isTagSyncMetricsEnabled = TagSyncConfig.isTagSyncMetricsEnabled(properties);
if (isTagSyncMetricsEnabled) {
TagSyncMetricsProducer tagSyncMetricsProducer = new TagSyncMetricsProducer();
Thread tagSyncMetricsProducerThread = new Thread(tagSyncMetricsProducer);
tagSyncMetricsProducerThread.setName("TagSyncMetricsProducerThread");
tagSyncMetricsProducerThread.setDaemon(true);
tagSyncMetricsProducerThread.start();
} else {
LOG.info(" Ranger tagsync metrics is not enabled");
}
for (TagSource tagSource : tagSources) {
threadsStarted = threadsStarted && tagSource.start();
}
if (threadsStarted) {
long tagSourceRetryInitializationInterval = TagSyncConfig.getTagSourceRetryInitializationInterval(properties);
synchronized(shutdownNotifier) {
while(! isShutdownInProgress) {
shutdownNotifier.wait(tagSourceRetryInitializationInterval);
if (CollectionUtils.isNotEmpty(failedTagSources)) {
reInitializeFailedTagSources();
}
}
}
}
} finally {
LOG.info("Stopping all tagSources");
for (TagSource tagSource : tagSources) {
tagSource.stop();
}
LOG.info("Stopping tagSink");
tagSink.stop();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagSynchronizer.run()");
}
}
public void shutdown(String reason) {
LOG.info("Received shutdown(), reason=" + reason);
synchronized(shutdownNotifier) {
isShutdownInProgress = true;
shutdownNotifier.notifyAll();
}
}
static public void printConfigurationProperties(Properties properties) {
LOG.info("--------------------------------");
LOG.info("");
LOG.info("Ranger-TagSync Configuration: {\n");
if (MapUtils.isNotEmpty(properties)) {
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
LOG.info("\tProperty-Name:" + entry.getKey());
LOG.info("\tProperty-Value:" + entry.getValue());
LOG.info("\n");
}
}
LOG.info("\n}");
LOG.info("");
LOG.info("--------------------------------");
}
static public TagSink initializeTagSink(Properties properties) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.initializeTagSink()");
}
TagSink ret = null;
try {
String tagSinkClassName = TagSyncConfig.getTagSinkClassName(properties);
if (LOG.isDebugEnabled()) {
LOG.debug("tagSinkClassName=" + tagSinkClassName);
}
@SuppressWarnings("unchecked")
Class<TagSink> tagSinkClass = (Class<TagSink>) Class.forName(tagSinkClassName);
ret = tagSinkClass.newInstance();
if (!ret.initialize(properties)) {
LOG.error("Failed to initialize TAG sink " + tagSinkClassName);
ret = null;
}
} catch (Throwable t) {
LOG.error("Failed to initialize TAG sink. Error details: ", t);
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagSynchronizer.initializeTagSink(), result:" + (ret == null ? "false" : "true"));
}
return ret;
}
private void initializeTagSources() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.initializeTagSources()");
}
List<String> tagSourceNameList = new ArrayList<String>();
for (Object propNameObj : properties.keySet()) {
String propName = propNameObj.toString();
if (!propName.startsWith(TAGSYNC_SOURCE_BASE)) {
continue;
}
String tagSourceName = propName.substring(TAGSYNC_SOURCE_BASE.length());
List<String> splits = toArray(tagSourceName, ".");
if (splits.size() > 1) {
continue;
}
String value = properties.getProperty(propName);
if (value.equalsIgnoreCase("enable")
|| value.equalsIgnoreCase("enabled")
|| value.equalsIgnoreCase("true")) {
tagSourceNameList.add(tagSourceName);
LOG.info("Tag source " + propName + " is set to "
+ value);
}
}
List<String> initializedTagSourceNameList = new ArrayList<String>();
for (String tagSourceName : tagSourceNameList) {
String tagSourcePropPrefix = TAGSYNC_SOURCE_BASE + tagSourceName;
TagSource tagSource = getTagSourceFromConfig(properties,
tagSourcePropPrefix, tagSourceName);
if (tagSource != null) {
try {
if (!tagSource.initialize(properties)) {
LOG.error("Failed to initialize TAG source " + tagSourceName);
failedTagSources.add(tagSource);
} else {
tagSource.setTagSink(tagSink);
tagSources.add(tagSource);
initializedTagSourceNameList.add(tagSourceName);
}
} catch(Exception exception) {
LOG.error("tag-source:" + tagSourceName + " initialization failed with ", exception);
failedTagSources.add(tagSource);
}
}
}
if (CollectionUtils.isEmpty(tagSources)) {
LOG.warn("TagSync is not configured for any tag-sources. No tags will be received by TagSync.");
LOG.warn("Please recheck configuration properties and tagsync environment to ensure that this is correct.");
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagSynchronizer.initializeTagSources(initializedTagSources=" + initializedTagSourceNameList
+ ", failedTagSources=" + failedTagSources + ")");
}
}
private void reInitializeFailedTagSources() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.reInitializeFailedTagSources()");
}
for (int index = 0; index < failedTagSources.size(); index++) {
TagSource tagSource = failedTagSources.get(index);
try {
if (tagSource.initialize(properties)) {
failedTagSources.remove(index);
--index;
tagSources.add(tagSource);
tagSource.setTagSink(tagSink);
if (tagSource.start()) {
tagSources.add(tagSource);
} else {
LOG.error("Failed to start tagSource: " + tagSource);
}
} else {
LOG.error("Failed to initialize TAG source " + tagSource);
}
} catch (Exception exception) {
LOG.error("tag-source:" + tagSource + " initialization failed with ", exception);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagSynchronizer.reInitializeFailedTagSources()");
}
}
static private TagSource getTagSourceFromConfig(Properties props,
String propPrefix, String tagSourceName) {
TagSource tagSource = null;
String className = getStringProperty(props, propPrefix + "."
+ PROP_CLASS_NAME);
if (StringUtils.isBlank(className)) {
if (tagSourceName.equals("file")) {
className = "org.apache.ranger.tagsync.source.file.FileTagSource";
} else if (tagSourceName.equalsIgnoreCase("atlas")) {
className = "org.apache.ranger.tagsync.source.atlas.AtlasTagSource";
} else if (tagSourceName.equals("atlasrest")) {
className = "org.apache.ranger.tagsync.source.atlasrest.AtlasRESTTagSource";
} else {
LOG.error("tagSource name doesn't have any class associated with it. tagSourceName="
+ tagSourceName + ", propertyPrefix=" + propPrefix);
}
}
if (StringUtils.isNotBlank(className)) {
try {
@SuppressWarnings("unchecked")
Class<TagSource> tagSourceClass = (Class<TagSource>) Class.forName(className);
tagSource = tagSourceClass.newInstance();
if (LOG.isDebugEnabled()) {
LOG.debug("Created instance of " + className);
}
tagSource.setName(tagSourceName);
} catch (Exception e) {
LOG.fatal("Can't instantiate tagSource class for tagSourceName="
+ tagSourceName + ", className=" + className
+ ", propertyPrefix=" + propPrefix, e);
}
}
return tagSource;
}
public static boolean initializeKerberosIdentity(Properties props) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.initializeKerberosIdentity()");
}
boolean ret = false;
String authenticationType = TagSyncConfig.getAuthenticationType(props);
String principal = TagSyncConfig.getKerberosPrincipal(props);
String keytab = TagSyncConfig.getKerberosKeytab(props);
String nameRules = TagSyncConfig.getNameRules(props);
if (LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("authenticationType=" + authenticationType);
LOG.debug("principal=" + principal);
LOG.debug("keytab" + keytab);
LOG.debug("nameRules=" + nameRules);
}
}
final boolean isKerberized = !StringUtils.isEmpty(authenticationType) && authenticationType.trim().equalsIgnoreCase(AUTH_TYPE_KERBEROS) && SecureClientLogin.isKerberosCredentialExists(principal, keytab);
if (isKerberized) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to get kerberos identitiy");
}
UserGroupInformation kerberosIdentity;
try {
UserGroupInformation.loginUserFromKeytab(principal, keytab);
kerberosIdentity = UserGroupInformation.getLoginUser();
if (kerberosIdentity != null) {
props.put(TagSyncConfig.TAGSYNC_KERBEROS_IDENTITY, kerberosIdentity.getUserName());
if (LOG.isDebugEnabled()) {
LOG.debug("Got UGI, user:[" + kerberosIdentity.getUserName() + "]");
}
ret = true;
} else {
LOG.error("KerberosIdentity is null!");
}
} catch (IOException exception) {
LOG.error("Failed to get UGI from principal:[" + principal + "], and keytab:[" + keytab + "]", exception);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not configured for Kerberos Authentication");
}
props.remove(TagSyncConfig.TAGSYNC_KERBEROS_IDENTITY);
ret = true;
}
if (!ret) {
props.remove(TagSyncConfig.TAGSYNC_KERBEROS_IDENTITY);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagSynchronizer.initializeKerberosIdentity() : " + ret);
}
return ret;
}
private static String getStringProperty(Properties props, String propName) {
String ret = null;
if (props != null && propName != null) {
String val = props.getProperty(propName);
if (val != null) {
ret = val;
}
}
return ret;
}
private static List<String> toArray(String destListStr, String delim) {
List<String> list = new ArrayList<String>();
if (destListStr != null && !destListStr.isEmpty()) {
StringTokenizer tokenizer = new StringTokenizer(destListStr,
delim.trim());
while (tokenizer.hasMoreTokens()) {
list.add(tokenizer.nextToken());
}
}
return list;
}
}