blob: c9f51992ff9917a590614758564fd9996746ea59 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator;
import com.google.common.base.Preconditions;
import org.apache.curator.drivers.TracerDriver;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.utils.DefaultTracerDriver;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* A wrapper around Zookeeper that takes care of some low-level housekeeping
*/
@SuppressWarnings("UnusedDeclaration")
public class CuratorZookeeperClient implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionState state;
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
private final int connectionTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
/**
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
* @param ensembleProvider the ensemble provider
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
*/
public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
}
/**
* @param zookeeperFactory factory for creating {@link ZooKeeper} instances
* @param ensembleProvider the ensemble provider
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param watcher default watcher or null
* @param retryPolicy the retry policy to use
* @param canBeReadOnly if true, allow ZooKeeper client to enter
* read only mode in case of a network partition. See
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
* for details
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}
/**
* Return the managed ZK instance.
*
* @return client the client
* @throws Exception if the connection timeout has elapsed or an exception occurs in a background process
*/
public ZooKeeper getZooKeeper() throws Exception
{
Preconditions.checkState(started.get(), "Client is not started");
return state.getZooKeeper();
}
/**
* Return a new retry loop. All operations should be performed in a retry loop
*
* @return new retry loop
*/
public RetryLoop newRetryLoop()
{
return new RetryLoop(retryPolicy.get(), tracer);
}
/**
* Return a new "session fail" retry loop. See {@link SessionFailRetryLoop} for details
* on when to use it.
*
* @param mode failure mode
* @return new retry loop
*/
public SessionFailRetryLoop newSessionFailRetryLoop(SessionFailRetryLoop.Mode mode)
{
return new SessionFailRetryLoop(this, mode);
}
/**
* Returns true if the client is current connected
*
* @return true/false
*/
public boolean isConnected()
{
return state.isConnected();
}
/**
* This method blocks until the connection to ZK succeeds. Use with caution. The block
* will timeout after the connection timeout (as passed to the constructor) has elapsed
*
* @return true if the connection succeeded, false if not
* @throws InterruptedException interrupted while waiting
*/
public boolean blockUntilConnectedOrTimedOut() throws InterruptedException
{
Preconditions.checkState(started.get(), "Client is not started");
log.debug("blockUntilConnectedOrTimedOut() start");
TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
internalBlockUntilConnectedOrTimedOut();
trace.commit();
boolean localIsConnected = state.isConnected();
log.debug("blockUntilConnectedOrTimedOut() end. isConnected: " + localIsConnected);
return localIsConnected;
}
/**
* Must be called after construction
*
* @throws IOException errors
*/
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
IllegalStateException ise = new IllegalStateException("Already started");
throw ise;
}
state.start();
}
/**
* Close the client
*/
public void close()
{
log.debug("Closing");
started.set(false);
try
{
state.close();
}
catch ( IOException e )
{
ThreadUtils.checkInterrupted(e);
log.error("", e);
}
}
/**
* Change the retry policy
*
* @param policy new policy
*/
public void setRetryPolicy(RetryPolicy policy)
{
Preconditions.checkNotNull(policy, "policy cannot be null");
retryPolicy.set(policy);
}
/**
* Return the current retry policy
*
* @return policy
*/
public RetryPolicy getRetryPolicy()
{
return retryPolicy.get();
}
/**
* Start a new tracer
* @param name name of the event
* @return the new tracer ({@link TimeTrace#commit()} must be called)
*/
public TimeTrace startTracer(String name)
{
return new TimeTrace(name, tracer.get());
}
/**
* Return the current tracing driver
*
* @return tracing driver
*/
public TracerDriver getTracerDriver()
{
return tracer.get();
}
/**
* Change the tracing driver
*
* @param tracer new tracing driver
*/
public void setTracerDriver(TracerDriver tracer)
{
this.tracer.set(tracer);
}
/**
* Returns the current known connection string - not guaranteed to be correct
* value at any point in the future.
*
* @return connection string
*/
public String getCurrentConnectionString()
{
return state.getEnsembleProvider().getConnectionString();
}
/**
* Return the configured connection timeout
*
* @return timeout
*/
public int getConnectionTimeoutMs()
{
return connectionTimeoutMs;
}
/**
* Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
* is incremented.
*
* @return the current instance index
*/
public long getInstanceIndex()
{
return state.getInstanceIndex();
}
void addParentWatcher(Watcher watcher)
{
state.addParentWatcher(watcher);
}
void removeParentWatcher(Watcher watcher)
{
state.removeParentWatcher(watcher);
}
void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
{
long waitTimeMs = connectionTimeoutMs;
while ( !state.isConnected() && (waitTimeMs > 0) )
{
final CountDownLatch latch = new CountDownLatch(1);
Watcher tempWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
latch.countDown();
}
};
state.addParentWatcher(tempWatcher);
long startTimeMs = System.currentTimeMillis();
try
{
latch.await(1, TimeUnit.SECONDS);
}
finally
{
state.removeParentWatcher(tempWatcher);
}
long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
waitTimeMs -= elapsed;
}
}
}