blob: 6cdada300c32ab524da273db89d9f2f63991955e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.util;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.auth.login.Configuration;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.oozie.ErrorCode;
import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB;
import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL;
import org.apache.oozie.event.listener.ZKConnectionListener;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
/**
* This class provides a singleton for interacting with ZooKeeper that other classes can use. It handles connecting to ZooKeeper,
* service discovery, and publishing metadata about this server.
* <p>
* Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton. This will ensure that we're
* properly connected and ready to go with ZooKeeper. When the user is done (i.e. on shutdown), it should call
* {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically
* remove itself from ZooKeeper.
* <p>
* Each Oozie Server provides metadata that can be shared with the other Oozie Servers. To keep things simple and to make it easy
* to add additional metadata in the future, we share a Map. They keys are defined in {@link ZKMetadataKeys}.
* <p>
* For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is
* /oozie/services/). ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named
* ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload. For example, with the default settings,
* an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata.
* <p>
* If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on
* Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace
* itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then
* "service" will be used for the ACLs).
* <p>
* Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}.
*/
public class ZKUtils {
/**
* oozie-site property for specifying the ZooKeeper connection string. Comma-separated values of host:port pairs of the
* ZooKeeper servers.
*/
public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string";
/**
* oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie"). All of the Oozie servers that are planning
* on talking to each other should have the same value for this.
*/
public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace";
/**
*Default ZK connection timeout ( in sec).
*/
public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout";
/**
*Default ZK session timeout ( in sec). If connection is lost after retry, then Oozie server will shutdown itself.
*/
public static final String ZK_SESSION_TIMEOUT = "oozie.zookeeper.session.timeout";
/**
* Maximum number of times to retry.
*/
public static final String ZK_MAX_RETRIES = "oozie.zookeeper.max.retries";
/**
* oozie-env environment variable for specifying the Oozie instance ID
*/
public static final String OOZIE_INSTANCE_ID = "oozie.instance.id";
/**
* oozie-site property for specifying that ZooKeeper is secure.
*/
public static final String ZK_SECURE = "oozie.zookeeper.secure";
private static final String ZK_OOZIE_SERVICE = "servers";
/**
* Services that need to put a node in zookeeper should go under here. Try to keep this area clean and organized.
*/
public static final String ZK_BASE_SERVICES_PATH = "/services";
private static Set<Object> users = new HashSet<Object>();
private CuratorFramework client = null;
private String zkId;
private long zkRegTime;
private ServiceDiscovery<Map> sDiscovery;
private ServiceCache<Map> sCache;
private List<ACL> saslACL;
private XLog log;
private static ZKUtils zk = null;
/**
* Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server.
*
* @throws Exception
*/
private ZKUtils() throws Exception {
log = XLog.getLog(getClass());
zkId = ConfigurationService.get(OOZIE_INSTANCE_ID);
if (zkId.isEmpty()) {
zkId = ConfigurationService.get("oozie.http.hostname");
}
createClient();
advertiseService();
checkAndSetACLs();
}
/**
* Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton.
*
* @param user The calling class
* @return the ZKUtils singleton
* @throws Exception if there is an issue when connecting to ZooKeeper
*/
public static synchronized ZKUtils register(Object user) throws Exception {
if (zk == null) {
zk = new ZKUtils();
}
// Remember the calling class so we can disconnect when everybody is done
users.add(user);
return zk;
}
/**
* Classes should call this when they are done (i.e. shutdown).
*
* @param user The calling class
*/
public synchronized void unregister(Object user) {
// If there are no more classes using ZooKeeper, we should teardown everything.
users.remove(user);
if (users.isEmpty() && zk != null) {
zk.teardown();
zk = null;
}
}
private void createClient() throws Exception {
// Connect to the ZooKeeper server
RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING);
String zkNamespace = getZKNameSpace();
int zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT);
int zkSessionTimeout = ConfigurationService.getInt(ZK_SESSION_TIMEOUT, 300);
ACLProvider aclProvider;
if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
setJaasConfiguration();
System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal())));
aclProvider = new SASLOwnerACLProvider();
} else {
log.info("Connecting to ZooKeeper without authentication");
aclProvider = new DefaultACLProvider(); // open to everyone
}
client = CuratorFrameworkFactory.builder()
.namespace(zkNamespace)
.connectString(zkConnectionString)
.retryPolicy(retryPolicy)
.aclProvider(aclProvider)
.connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms
.sessionTimeoutMs(zkSessionTimeout * 1000) //in ms
.build();
client.start();
client.getConnectionStateListenable().addListener(new ZKConnectionListener());
}
private void advertiseService() throws Exception {
// Advertise on the service discovery
new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient());
InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class);
sDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
.basePath(ZK_BASE_SERVICES_PATH)
.client(client)
.serializer(instanceSerializer)
.build();
sDiscovery.start();
sDiscovery.registerService(getMetadataInstance());
// Create the service discovery cache
sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build();
sCache.start();
zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC();
}
private void unadvertiseService() throws Exception {
// Stop the service discovery cache
sCache.close();
// Unadvertise on the service discovery
sDiscovery.unregisterService(getMetadataInstance());
sDiscovery.close();
}
private void teardown() {
try {
zk.unadvertiseService();
}
catch (Exception ex) {
log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex);
}
client.close();
client = null;
}
private ServiceInstance<Map> getMetadataInstance() throws Exception {
// Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers
String url = ConfigUtils.getOozieEffectiveUrl();
Map<String, String> map = new HashMap<String, String>();
map.put(ZKMetadataKeys.OOZIE_ID, zkId);
map.put(ZKMetadataKeys.OOZIE_URL, url);
return ServiceInstance.<Map>builder()
.name(ZK_OOZIE_SERVICE)
.id(zkId)
.payload(map)
.build();
}
/**
* Returns a list of the metadata provided by all of the Oozie Servers. Note that the metadata is cached so it may be a second
* or two stale.
*
* @return a List of the metadata provided by all of the Oozie Servers.
*/
public List<ServiceInstance<Map>> getAllMetaData() {
List<ServiceInstance<Map>> instances = null;
if (sCache != null) {
instances = sCache.getInstances();
}
return instances;
}
/**
* Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers
*
* @return the ID of this Oozie Server
*/
public String getZKId() {
return zkId;
}
/**
* Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform
* more direct operations on ZooKeeper. Most of the time, this shouldn't be needed.
* <p>
* Be careful not to close the connection.
*
* @return the CuratorFramework object
*/
public CuratorFramework getClient() {
return client;
}
/**
* Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
*
* @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()}
* @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
*/
public int getZKIdIndex(List<ServiceInstance<Map>> oozies) {
int index = 0;
// We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId
for (ServiceInstance<Map> oozie : oozies) {
long otherRegTime = oozie.getRegistrationTimeUTC();
if (otherRegTime < zkRegTime) {
index++;
}
}
return index;
}
private void checkAndSetACLs() throws Exception {
if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
// If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
// and set the ACLs for them
// We can't get the namespace znode through curator; have to go through zk client
String namespace = "/" + client.getNamespace();
if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) {
List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat());
if (!acls.get(0).getId().getScheme().equals("sasl")) {
log.info("'sasl' ACLs not set; setting...");
List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null);
for (String child : children) {
checkAndSetACLs("/" + child);
}
client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1);
}
}
}
}
private void checkAndSetACLs(String path) throws Exception {
List<String> children = client.getChildren().forPath(path);
for (String child : children) {
checkAndSetACLs(path + "/" + child);
}
client.setACL().withACL(saslACL).forPath(path);
}
// This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
private void setJaasConfiguration() throws ServiceException, IOException {
String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim();
if (keytabFile.length() == 0) {
throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
}
String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
if (principal.length() == 0) {
throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
}
// This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
// point to it (but this way we don't have to write a file, and it works better for the tests)
JaasConfiguration.addEntry("Client", principal, keytabFile);
Configuration.setConfiguration(JaasConfiguration.getInstance());
}
private String getServicePrincipal() throws ServiceException {
String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
if (principal.length() == 0) {
throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
}
return principal.split("[/@]")[0];
}
/**
* Useful for tests to get the registered classes
*
* @return the set of registered classes
*/
@VisibleForTesting
public static Set<Object> getUsers() {
return users;
}
/**
* Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers
*/
public abstract class ZKMetadataKeys {
/**
* The ID of the Oozie Server
*/
public static final String OOZIE_ID = "OOZIE_ID";
/**
* The URL of the Oozie Server
*/
public static final String OOZIE_URL = "OOZIE_URL";
}
/**
* Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}.
*/
public class SASLOwnerACLProvider implements ACLProvider {
@Override
public List<ACL> getDefaultAcl() {
return saslACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return saslACL;
}
}
/**
* Returns retry policy
*
* @return RetryPolicy
*/
public static RetryPolicy getRetryPolicy() {
return new ExponentialBackoffRetry(1000, ConfigurationService.getInt(ZK_MAX_RETRIES, 10));
}
/**
* Returns configured zk namesapces
* @return oozie.zookeeper.namespace
*/
public static String getZKNameSpace() {
return ConfigurationService.get(ZK_NAMESPACE);
}
}