| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.registry.server.services; |
| |
| import com.google.common.base.Preconditions; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.registry.client.api.RegistryConstants; |
| import org.apache.hadoop.registry.client.impl.zk.BindingInformation; |
| import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource; |
| import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; |
| import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; |
| import org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions; |
| import org.apache.zookeeper.server.ServerCnxnFactory; |
| import org.apache.zookeeper.server.ZooKeeperServer; |
| import org.apache.zookeeper.server.persistence.FileTxnSnapLog; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| |
| /** |
| * This is a small, localhost Zookeeper service instance that is contained |
| * in a YARN service...it's been derived from Apache Twill. |
| * <p> |
| * It implements {@link RegistryBindingSource} and provides binding information, |
| * <i>once started</i>. Until {@link #start()} is called, the hostname and |
| * port may be undefined. Accordingly, the service raises an exception in this |
| * condition. |
| * <p> |
| * If you wish to chain together a registry service with this one under |
| * the same {@code CompositeService}, this service must be added |
| * as a child first. |
| * <p> |
| * It also sets the configuration parameter |
| * {@link RegistryConstants#KEY_REGISTRY_ZK_QUORUM} |
| * to its connection string. Any code with access to the service configuration |
| * can view it. |
| */ |
| @InterfaceStability.Evolving |
| public class MicroZookeeperService |
| extends AbstractService |
| implements RegistryBindingSource, RegistryConstants, |
| ZookeeperConfigOptions, |
| MicroZookeeperServiceKeys{ |
| |
| |
| private static final Logger |
| LOG = LoggerFactory.getLogger(MicroZookeeperService.class); |
| |
| private File instanceDir; |
| private File dataDir; |
| private int tickTime; |
| private int port; |
| private String host; |
| private boolean secureServer; |
| |
| private ServerCnxnFactory factory; |
| private BindingInformation binding; |
| private File confDir; |
| private StringBuilder diagnostics = new StringBuilder(); |
| |
| /** |
| * Create an instance |
| * @param name service name |
| */ |
| public MicroZookeeperService(String name) { |
| super(name); |
| } |
| |
| /** |
| * Get the connection string. |
| * @return the string |
| * @throws IllegalStateException if the connection is not yet valid |
| */ |
| public String getConnectionString() { |
| Preconditions.checkState(factory != null, "service not started"); |
| InetSocketAddress addr = factory.getLocalAddress(); |
| return String.format("%s:%d", addr.getHostName(), addr.getPort()); |
| } |
| |
| /** |
| * Get the connection address |
| * @return the connection as an address |
| * @throws IllegalStateException if the connection is not yet valid |
| */ |
| public InetSocketAddress getConnectionAddress() { |
| Preconditions.checkState(factory != null, "service not started"); |
| return factory.getLocalAddress(); |
| } |
| |
| /** |
| * Create an inet socket addr from the local host + port number |
| * @param port port to use |
| * @return a (hostname, port) pair |
| * @throws UnknownHostException if the server cannot resolve the host |
| */ |
| private InetSocketAddress getAddress(int port) throws UnknownHostException { |
| return new InetSocketAddress(host, port < 0 ? 0 : port); |
| } |
| |
| /** |
| * Initialize the service, including choosing a path for the data |
| * @param conf configuration |
| * @throws Exception |
| */ |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| port = conf.getInt(KEY_ZKSERVICE_PORT, 0); |
| tickTime = conf.getInt(KEY_ZKSERVICE_TICK_TIME, |
| ZooKeeperServer.DEFAULT_TICK_TIME); |
| String instancedirname = conf.getTrimmed( |
| KEY_ZKSERVICE_DIR, ""); |
| host = conf.getTrimmed(KEY_ZKSERVICE_HOST, DEFAULT_ZKSERVICE_HOST); |
| if (instancedirname.isEmpty()) { |
| File testdir = new File(System.getProperty("test.dir", "target")); |
| instanceDir = new File(testdir, "zookeeper" + getName()); |
| } else { |
| instanceDir = new File(instancedirname); |
| FileUtil.fullyDelete(instanceDir); |
| } |
| LOG.debug("Instance directory is {}", instanceDir); |
| mkdirStrict(instanceDir); |
| dataDir = new File(instanceDir, "data"); |
| confDir = new File(instanceDir, "conf"); |
| mkdirStrict(dataDir); |
| mkdirStrict(confDir); |
| super.serviceInit(conf); |
| } |
| |
| /** |
| * Create a directory, ignoring if the dir is already there, |
| * and failing if a file or something else was at the end of that |
| * path |
| * @param dir dir to guarantee the existence of |
| * @throws IOException IO problems, or path exists but is not a dir |
| */ |
| private void mkdirStrict(File dir) throws IOException { |
| if (!dir.mkdirs()) { |
| if (!dir.isDirectory()) { |
| throw new IOException("Failed to mkdir " + dir); |
| } |
| } |
| } |
| |
| /** |
| * Append a formatted string to the diagnostics. |
| * <p> |
| * A newline is appended afterwards. |
| * @param text text including any format commands |
| * @param args arguments for the forma operation. |
| */ |
| protected void addDiagnostics(String text, Object ... args) { |
| diagnostics.append(String.format(text, args)).append('\n'); |
| } |
| |
| /** |
| * Get the diagnostics info |
| * @return the diagnostics string built up |
| */ |
| public String getDiagnostics() { |
| return diagnostics.toString(); |
| } |
| |
| /** |
| * set up security. this must be done prior to creating |
| * the ZK instance, as it sets up JAAS if that has not been done already. |
| * |
| * @return true if the cluster has security enabled. |
| */ |
| public boolean setupSecurity() throws IOException { |
| Configuration conf = getConfig(); |
| String jaasContext = conf.getTrimmed(KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT); |
| secureServer = StringUtils.isNotEmpty(jaasContext); |
| if (secureServer) { |
| RegistrySecurity.validateContext(jaasContext); |
| RegistrySecurity.bindZKToServerJAASContext(jaasContext); |
| // policy on failed auth |
| System.setProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS, |
| conf.get(KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS, |
| "true")); |
| |
| //needed so that you can use sasl: strings in the registry |
| System.setProperty(RegistryInternalConstants.ZOOKEEPER_AUTH_PROVIDER +".1", |
| RegistryInternalConstants.SASLAUTHENTICATION_PROVIDER); |
| String serverContext = |
| System.getProperty(PROP_ZK_SERVER_SASL_CONTEXT); |
| addDiagnostics("Server JAAS context s = %s", serverContext); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * Startup: start ZK. It is only after this that |
| * the binding information is valid. |
| * @throws Exception |
| */ |
| @Override |
| protected void serviceStart() throws Exception { |
| |
| setupSecurity(); |
| |
| ZooKeeperServer zkServer = new ZooKeeperServer(); |
| FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir); |
| zkServer.setTxnLogFactory(ftxn); |
| zkServer.setTickTime(tickTime); |
| |
| LOG.info("Starting Local Zookeeper service"); |
| factory = ServerCnxnFactory.createFactory(); |
| factory.configure(getAddress(port), -1); |
| factory.startup(zkServer); |
| |
| String connectString = getConnectionString(); |
| LOG.info("In memory ZK started at {}\n", connectString); |
| |
| if (LOG.isDebugEnabled()) { |
| StringWriter sw = new StringWriter(); |
| PrintWriter pw = new PrintWriter(sw); |
| zkServer.dumpConf(pw); |
| pw.flush(); |
| LOG.debug(sw.toString()); |
| } |
| binding = new BindingInformation(); |
| binding.ensembleProvider = new FixedEnsembleProvider(connectString); |
| binding.description = |
| getName() + " reachable at \"" + connectString + "\""; |
| |
| addDiagnostics(binding.description); |
| // finally: set the binding information in the config |
| getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString); |
| } |
| |
| /** |
| * When the service is stopped, it deletes the data directory |
| * and its contents |
| * @throws Exception |
| */ |
| @Override |
| protected void serviceStop() throws Exception { |
| if (factory != null) { |
| factory.shutdown(); |
| factory = null; |
| } |
| if (dataDir != null) { |
| FileUtil.fullyDelete(dataDir); |
| } |
| } |
| |
| @Override |
| public BindingInformation supplyBindingInformation() { |
| Preconditions.checkNotNull(binding, |
| "Service is not started: binding information undefined"); |
| return binding; |
| } |
| } |