blob: 50351f0ee8422604b86ab8af7c8dcbcb6a76a2c6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* A factory to create objects related to Curator.
*
* Allows for stubbing in tests.
*/
@Singleton
public class CuratorFactory {
private static final Logger LOG = LoggerFactory.getLogger(CuratorFactory.class);
public static final String APACHE_ATLAS_LEADER_ELECTOR_PATH = "/leader_elector_path";
public static final String SASL_SCHEME = "sasl";
public static final String WORLD_SCHEME = "world";
public static final String ANYONE_ID = "anyone";
public static final String AUTH_SCHEME = "auth";
public static final String DIGEST_SCHEME = "digest";
public static final String IP_SCHEME = "ip";
public static final String SETUP_LOCK = "/setup_lock";
private final Configuration configuration;
private CuratorFramework curatorFramework;
/**
* Initializes the {@link CuratorFramework} that is used for all interaction with Zookeeper.
* @throws AtlasException
*/
public CuratorFactory() throws AtlasException {
this(ApplicationProperties.get());
}
public CuratorFactory(Configuration configuration) {
this.configuration = configuration;
initializeCuratorFramework();
}
@VisibleForTesting
protected void initializeCuratorFramework() {
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
CuratorFrameworkFactory.Builder builder = getBuilder(zookeeperProperties);
enhanceBuilderWithSecurityParameters(zookeeperProperties, builder);
curatorFramework = builder.build();
curatorFramework.start();
}
@VisibleForTesting
void enhanceBuilderWithSecurityParameters(HAConfiguration.ZookeeperProperties zookeeperProperties,
CuratorFrameworkFactory.Builder builder) {
ACLProvider aclProvider = getAclProvider(zookeeperProperties);
AuthInfo authInfo = null;
if (zookeeperProperties.hasAuth()) {
authInfo = AtlasZookeeperSecurityProperties.parseAuth(zookeeperProperties.getAuth());
}
if (aclProvider != null) {
LOG.info("Setting up acl provider.");
builder.aclProvider(aclProvider);
if (authInfo != null) {
byte[] auth = authInfo.getAuth();
LOG.info("Setting up auth provider with scheme: {} and id: {}", authInfo.getScheme(),
getIdForLogging(authInfo.getScheme(), new String(auth, Charsets.UTF_8)));
builder.authorization(authInfo.getScheme(), auth);
}
}
}
private String getCurrentUser() {
try {
return UserGroupInformation.getCurrentUser().getUserName();
} catch (IOException ioe) {
return "unknown";
}
}
private ACLProvider getAclProvider(HAConfiguration.ZookeeperProperties zookeeperProperties) {
ACLProvider aclProvider = null;
if (zookeeperProperties.hasAcl()) {
final ACL acl = AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl());
LOG.info("Setting ACL for id {} with scheme {} and perms {}.",
getIdForLogging(acl.getId().getScheme(), acl.getId().getId()),
acl.getId().getScheme(), acl.getPerms());
LOG.info("Current logged in user: {}", getCurrentUser());
final List<ACL> acls = Arrays.asList(acl);
aclProvider = new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return acls;
}
@Override
public List<ACL> getAclForPath(String path) {
return acls;
}
};
}
return aclProvider;
}
private String getIdForLogging(String scheme, String id) {
if (scheme.equalsIgnoreCase(SASL_SCHEME) ||
scheme.equalsIgnoreCase(IP_SCHEME)) {
return id;
} else if (scheme.equalsIgnoreCase(WORLD_SCHEME)) {
return ANYONE_ID;
} else if (scheme.equalsIgnoreCase(AUTH_SCHEME) ||
scheme.equalsIgnoreCase(DIGEST_SCHEME)) {
return id.split(":")[0];
}
return "unknown";
}
private CuratorFrameworkFactory.Builder getBuilder(HAConfiguration.ZookeeperProperties zookeeperProperties) {
return CuratorFrameworkFactory.builder().
connectString(zookeeperProperties.getConnectString()).
sessionTimeoutMs(zookeeperProperties.getSessionTimeout()).
retryPolicy(new ExponentialBackoffRetry(
zookeeperProperties.getRetriesSleepTimeMillis(), zookeeperProperties.getNumRetries()));
}
/**
* Cleanup resources related to {@link CuratorFramework}.
*
* After this call, no further calls to any curator objects should be done.
*/
public void close() {
curatorFramework.close();
}
/**
* Returns a pre-created instance of {@link CuratorFramework}.
*
* This method can be called any number of times to access the {@link CuratorFramework} used in the
* application.
* @return
*/
public CuratorFramework clientInstance() {
return curatorFramework;
}
/**
* Create a new instance {@link LeaderLatch}
* @param serverId the ID used to register this instance with curator.
* This ID should typically be obtained using
* {@link org.apache.atlas.ha.AtlasServerIdSelector#selectServerId(Configuration)}
* @param zkRoot the root znode under which the leader latch node is added.
* @return
*/
public LeaderLatch leaderLatchInstance(String serverId, String zkRoot) {
return new LeaderLatch(curatorFramework, zkRoot+APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId);
}
public InterProcessMutex lockInstance(String zkRoot) {
return new InterProcessMutex(curatorFramework, zkRoot+ SETUP_LOCK);
}
}