blob: 02b4552fc1fc1d0a294f9ea89b315c9a47fc4b15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryPolicy;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.imps.GzipCompressionProvider;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
* Factory methods for creating framework-style clients
*/
public class CuratorFrameworkFactory
{
private static final int DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
private static final byte[] LOCAL_ADDRESS = getLocalAddress();
private static final CompressionProvider DEFAULT_COMPRESSION_PROVIDER = new GzipCompressionProvider();
private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
private static final DefaultACLProvider DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
private static final int DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1);
private static final boolean DEFAULT_WITH_ENSEMBLE_TRACKER = true;
/**
* Return a new builder that builds a CuratorFramework
*
* @return new builder
*/
public static Builder builder()
{
return new Builder();
}
/**
* Create a new client with default session timeout and default connection timeout
*
* @param connectString list of servers to connect to
* @param retryPolicy retry policy to use
* @return client
*/
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
{
return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}
/**
* Create a new client
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy retry policy to use
* @return client
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
/**
* Create a new client
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy retry policy to use
* @param zkClientConfig ZKClientConfig
* @return client
*
* @since 5.1.1, supported from ZooKeeper 3.6.1 and above.
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
zkClientConfig(zkClientConfig).
build();
}
/**
* Return the local address as bytes that can be used as a node payload
*
* @return local address bytes
*/
public static byte[] getLocalAddress()
{
try
{
return InetAddress.getLocalHost().getHostAddress().getBytes();
}
catch ( UnknownHostException ignore )
{
// ignore
}
return new byte[0];
}
public static class Builder
{
private EnsembleProvider ensembleProvider;
private boolean withEnsembleTracker = DEFAULT_WITH_ENSEMBLE_TRACKER;
private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS;
private RetryPolicy retryPolicy;
private ThreadFactory threadFactory = null;
private String namespace;
private List<AuthInfo> authInfos = null;
private byte[] defaultData = LOCAL_ADDRESS;
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
private ConnectionStateErrorPolicy connectionStateErrorPolicy = new StandardConnectionStateErrorPolicy();
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
private int simulatedSessionExpirationPercent = 100;
private ZKClientConfig zkClientConfig;
/**
* Apply the current values and build a new CuratorFramework
*
* @return new CuratorFramework
*/
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
/**
* Apply the current values and build a new temporary CuratorFramework. Temporary
* CuratorFramework instances are meant for single requests to ZooKeeper ensembles
* over a failure prone network such as a WAN. The APIs available from {@link CuratorTempFramework}
* are limited. Further, the connection will be closed after 3 minutes of inactivity.
*
* @return temp instance
*/
public CuratorTempFramework buildTemp()
{
return buildTemp(DEFAULT_INACTIVE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
}
/**
* Apply the current values and build a new temporary CuratorFramework. Temporary
* CuratorFramework instances are meant for single requests to ZooKeeper ensembles
* over a failure prone network such as a WAN. The APIs available from {@link CuratorTempFramework}
* are limited. Further, the connection will be closed after <code>inactiveThresholdMs</code> milliseconds of inactivity.
*
* @param inactiveThreshold number of milliseconds of inactivity to cause connection close
* @param unit threshold unit
* @return temp instance
*/
public CuratorTempFramework buildTemp(long inactiveThreshold, TimeUnit unit)
{
return new CuratorTempFrameworkImpl(this, unit.toMillis(inactiveThreshold));
}
/**
* Add connection authorization
*
* Subsequent calls to this method overwrite the prior calls.
*
* @param scheme the scheme
* @param auth the auth bytes
* @return this
*/
public Builder authorization(String scheme, byte[] auth)
{
return authorization(ImmutableList.of(new AuthInfo(scheme, (auth != null) ? Arrays.copyOf(auth, auth.length) : null)));
}
/**
* Add connection authorization. The supplied authInfos are appended to those added via call to
* {@link #authorization(java.lang.String, byte[])} for backward compatibility.
* <p/>
* Subsequent calls to this method overwrite the prior calls.
*
* @param authInfos list of {@link AuthInfo} objects with scheme and auth
* @return this
*/
public Builder authorization(List<AuthInfo> authInfos)
{
this.authInfos = ImmutableList.copyOf(authInfos);
return this;
}
/**
* Set the list of servers to connect to. IMPORTANT: use either this or {@link #ensembleProvider(EnsembleProvider)}
* but not both.
*
* @param connectString list of servers to connect to
* @return this
*/
public Builder connectString(String connectString)
{
ensembleProvider = new FixedEnsembleProvider(connectString);
return this;
}
/**
* Set the list ensemble provider. IMPORTANT: use either this or {@link #connectString(String)}
* but not both.
*
* @param ensembleProvider the ensemble provider to use
* @return this
*/
public Builder ensembleProvider(EnsembleProvider ensembleProvider)
{
this.ensembleProvider = ensembleProvider;
return this;
}
/**
* Allows to configure if the ensemble configuration changes will be watched.
* The default value is {@code true}.<br>
*
* IMPORTANT: Use this method in combination with {@link #ensembleProvider(EnsembleProvider)} to provide
* an instance that returns {@code false} on {@link EnsembleProvider#updateServerListEnabled()} in order
* to fully achieve that ensemble server list changes are ignored<br>
*
* @param withEnsembleTracker use {@code false} if you want to avoid following ensemble configuration changes
* @return this
*/
public Builder ensembleTracker(boolean withEnsembleTracker) {
this.withEnsembleTracker = withEnsembleTracker;
return this;
}
/**
* @return {@code true} if ensemble configuration changes MUST be watched
*/
public boolean withEnsembleTracker() {
return withEnsembleTracker;
}
/**
* Sets the data to use when {@link PathAndBytesable#forPath(String)} is used.
* This is useful for debugging purposes. For example, you could set this to be the IP of the
* client.
*
* @param defaultData new default data to use
* @return this
*/
public Builder defaultData(byte[] defaultData)
{
this.defaultData = (defaultData != null) ? Arrays.copyOf(defaultData, defaultData.length) : null;
return this;
}
/**
* As ZooKeeper is a shared space, users of a given cluster should stay within
* a pre-defined namespace. If a namespace is set here, all paths will get pre-pended
* with the namespace
*
* @param namespace the namespace
* @return this
*/
public Builder namespace(String namespace)
{
this.namespace = namespace;
return this;
}
/**
* @param sessionTimeoutMs session timeout
* @return this
*/
public Builder sessionTimeoutMs(int sessionTimeoutMs)
{
this.sessionTimeoutMs = sessionTimeoutMs;
return this;
}
/**
* @param connectionTimeoutMs connection timeout
* @return this
*/
public Builder connectionTimeoutMs(int connectionTimeoutMs)
{
this.connectionTimeoutMs = connectionTimeoutMs;
return this;
}
/**
* @param maxCloseWaitMs time to wait during close to join background threads
* @return this
*/
public Builder maxCloseWaitMs(int maxCloseWaitMs)
{
this.maxCloseWaitMs = maxCloseWaitMs;
return this;
}
/**
* @param retryPolicy retry policy to use
* @return this
*/
public Builder retryPolicy(RetryPolicy retryPolicy)
{
this.retryPolicy = retryPolicy;
return this;
}
/**
* @param threadFactory thread factory used to create Executor Services
* @return this
*/
public Builder threadFactory(ThreadFactory threadFactory)
{
this.threadFactory = threadFactory;
return this;
}
/**
* @param compressionProvider the compression provider
* @return this
*/
public Builder compressionProvider(CompressionProvider compressionProvider)
{
this.compressionProvider = compressionProvider;
return this;
}
/**
* @param zookeeperFactory the zookeeper factory to use
* @return this
*/
public Builder zookeeperFactory(ZookeeperFactory zookeeperFactory)
{
this.zookeeperFactory = zookeeperFactory;
return this;
}
/**
* @param aclProvider a provider for ACLs
* @return this
*/
public Builder aclProvider(ACLProvider aclProvider)
{
this.aclProvider = aclProvider;
return this;
}
/**
* @param canBeReadOnly if true, allow ZooKeeper client to enter
* read only mode in case of a network partition. See
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
* for details
* @return this
*/
public Builder canBeReadOnly(boolean canBeReadOnly)
{
this.canBeReadOnly = canBeReadOnly;
return this;
}
/**
* By default, Curator uses {@link CreateBuilder#creatingParentContainersIfNeeded()}
* if the ZK JAR supports {@link CreateMode#CONTAINER}. Call this method to turn off this behavior.
*
* @return this
*/
public Builder dontUseContainerParents()
{
this.useContainerParentsIfAvailable = false;
return this;
}
/**
* Set the error policy to use. The default is {@link StandardConnectionStateErrorPolicy}
*
* @since 3.0.0
* @param connectionStateErrorPolicy new error policy
* @return this
*/
public Builder connectionStateErrorPolicy(ConnectionStateErrorPolicy connectionStateErrorPolicy)
{
this.connectionStateErrorPolicy = connectionStateErrorPolicy;
return this;
}
/**
* Set a timeout for {@link CuratorZookeeperClient#close(int)} }.
* The default is 0, which means that this feature is disabled.
*
* @since 4.0.2
* @param waitForShutdownTimeoutMs default timeout
* @return this
*/
public Builder waitForShutdownTimeoutMs(int waitForShutdownTimeoutMs)
{
this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
return this;
}
/**
* <p>
* Prior to 3.0.0, Curator did not try to manage session expiration
* other than the functionality provided by ZooKeeper itself. Starting with
* 3.0.0, Curator has the option of attempting to monitor session expiration
* above what is provided by ZooKeeper. The percentage set by this method
* determines how and if Curator will check for session expiration.
* </p>
*
* <p>
* The default percentage is 100.
* </p>
*
* <p>
* If it is set to <tt>0</tt>, Curator does not do any additional checking
* for session expiration.
* </p>
*
* <p>
* If a positive number is set, Curator will check for session expiration
* as follows: when ZooKeeper sends a Disconnect event, Curator will start a timer.
* If re-connection is not achieved before the elapsed time exceeds the negotiated
* session time multiplied by the session expiration percent, Curator will simulate
* a session expiration. Due to timing/network issues, it is <b>not possible</b> for
* a client to match the server's session timeout with complete accuracy. Thus, the need
* for a session expiration percentage.
* </p>
*
* @param simulatedSessionExpirationPercent new simulated session expiration percentage
* @return this
* @since 5.0
*/
public Builder simulatedSessionExpirationPercent(int simulatedSessionExpirationPercent) {
Preconditions.checkArgument(
(simulatedSessionExpirationPercent > 0) && (simulatedSessionExpirationPercent <= 100),
"simulatedSessionExpirationPercent must be > 0 and <= 100");
this.simulatedSessionExpirationPercent = simulatedSessionExpirationPercent;
return this;
}
public Builder zkClientConfig(ZKClientConfig zkClientConfig) {
this.zkClientConfig = zkClientConfig;
return this;
}
/**
* Add an enforced schema set
*
* @param schemaSet the schema set
* @return this
* @since 3.2.0
*/
public Builder schemaSet(SchemaSet schemaSet)
{
this.schemaSet = schemaSet;
return this;
}
/**
* Curator (and user) recipes will use this executor to call notifyAll
* and other blocking calls that might normally block ZooKeeper's event thread.
* By default, an executor is allocated internally using the provided (or default)
* {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method
* to set a custom executor.
*
* @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc
* @return this
* @since 4.1.0
*/
public Builder runSafeService(Executor runSafeService)
{
this.runSafeService = runSafeService;
return this;
}
/**
* Sets the connection state listener manager factory. For example,
* you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
*
* @param connectionStateListenerManagerFactory manager factory to use
* @return this
* @since 4.2.0
*/
public Builder connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory)
{
this.connectionStateListenerManagerFactory = Objects.requireNonNull(connectionStateListenerManagerFactory, "connectionStateListenerManagerFactory cannot be null");
return this;
}
public Executor getRunSafeService()
{
return runSafeService;
}
public ACLProvider getAclProvider()
{
return aclProvider;
}
public ZookeeperFactory getZookeeperFactory()
{
return zookeeperFactory;
}
public CompressionProvider getCompressionProvider()
{
return compressionProvider;
}
public ThreadFactory getThreadFactory()
{
return threadFactory;
}
public EnsembleProvider getEnsembleProvider()
{
return ensembleProvider;
}
public int getSessionTimeoutMs()
{
return sessionTimeoutMs;
}
public int getConnectionTimeoutMs()
{
return connectionTimeoutMs;
}
public int getWaitForShutdownTimeoutMs()
{
return waitForShutdownTimeoutMs;
}
public int getMaxCloseWaitMs()
{
return maxCloseWaitMs;
}
public RetryPolicy getRetryPolicy()
{
return retryPolicy;
}
public String getNamespace()
{
return namespace;
}
public boolean useContainerParentsIfAvailable()
{
return useContainerParentsIfAvailable;
}
public ConnectionStateErrorPolicy getConnectionStateErrorPolicy()
{
return connectionStateErrorPolicy;
}
public int getSimulatedSessionExpirationPercent() {
return simulatedSessionExpirationPercent;
}
public ZKClientConfig getZkClientConfig() {
return zkClientConfig;
}
public SchemaSet getSchemaSet()
{
return schemaSet;
}
@Deprecated
public String getAuthScheme()
{
int qty = (authInfos != null) ? authInfos.size() : 0;
switch ( qty )
{
case 0:
{
return null;
}
case 1:
{
return authInfos.get(0).scheme;
}
default:
{
throw new IllegalStateException("More than 1 auth has been added");
}
}
}
@Deprecated
public byte[] getAuthValue()
{
int qty = (authInfos != null) ? authInfos.size() : 0;
switch ( qty )
{
case 0:
{
return null;
}
case 1:
{
byte[] bytes = authInfos.get(0).getAuth();
return (bytes != null) ? Arrays.copyOf(bytes, bytes.length) : null;
}
default:
{
throw new IllegalStateException("More than 1 auth has been added");
}
}
}
public List<AuthInfo> getAuthInfos()
{
return authInfos;
}
public byte[] getDefaultData()
{
return defaultData;
}
public boolean canBeReadOnly()
{
return canBeReadOnly;
}
public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFactory()
{
return connectionStateListenerManagerFactory;
}
private Builder()
{
}
}
private CuratorFrameworkFactory()
{
}
}