blob: bbd9203d3f88f9775816aa626a0674ebd7abfc9c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import org.apache.curator.RetryLoop;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.PathUtils;
import org.apache.zookeeper.KeeperException;
import java.util.Arrays;
* <p>A distributed value that attempts atomic sets. It first tries uses optimistic locking. If that fails,
* an optional {@link InterProcessMutex} is taken. For both optimistic and mutex, a retry policy is used to
* retry the increment.</p>
* <p>The various methods return an {@link AtomicValue} object. You must <b>always</b> check
* {@link AtomicValue#succeeded()}. None of the methods (other than get()) are guaranteed to succeed.</p>
public class DistributedAtomicValue
private final CuratorFramework client;
private final String path;
private final RetryPolicy retryPolicy;
private final PromotedToLock promotedToLock;
private final InterProcessMutex mutex;
* Creates in optimistic mode only - i.e. the promotion to a mutex is not done
* @param client the client
* @param path path to hold the value
* @param retryPolicy the retry policy to use
public DistributedAtomicValue(CuratorFramework client, String path, RetryPolicy retryPolicy)
this(client, path, retryPolicy, null);
* Creates in mutex promotion mode. The optimistic lock will be tried first using
* the given retry policy. If the increment does not succeed, a {@link InterProcessMutex} will be tried
* with its own retry policy
* @param client the client
* @param path path to hold the value
* @param retryPolicy the retry policy to use
* @param promotedToLock the arguments for the mutex promotion
public DistributedAtomicValue(CuratorFramework client, String path, RetryPolicy retryPolicy, PromotedToLock promotedToLock)
this.client = client;
this.path = PathUtils.validatePath(path);
this.retryPolicy = retryPolicy;
this.promotedToLock = promotedToLock;
mutex = (promotedToLock != null) ? new InterProcessMutex(client, promotedToLock.getPath()) : null;
* Returns the current value of the counter. NOTE: if the value has never been set,
* <code>0</code> is returned.
* @return value info
* @throws Exception ZooKeeper errors
public AtomicValue<byte[]> get() throws Exception
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
getCurrentValue(result, new Stat());
result.postValue = result.preValue;
result.succeeded = true;
return result;
* Forcibly sets the value any guarantees of atomicity.
* @param newValue the new value
* @throws Exception ZooKeeper errors
public void forceSet(byte[] newValue) throws Exception
client.setData().forPath(path, newValue);
catch ( KeeperException.NoNodeException dummy )
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
catch ( KeeperException.NodeExistsException dummy2 )
client.setData().forPath(path, newValue);
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
* Remember to always check {@link AtomicValue#succeeded()}.
* @param expectedValue the expected value
* @param newValue the new value
* @return value info
* @throws Exception ZooKeeper errors
public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
Stat stat = new Stat();
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
catch ( KeeperException.BadVersionException dummy )
result.succeeded = false;
catch ( KeeperException.NoNodeException dummy )
result.succeeded = false;
result.succeeded = false;
return result;
* Attempt to atomically set the value to the given value. Remember to always
* check {@link AtomicValue#succeeded()}.
* @param newValue the value to set
* @return value info
* @throws Exception ZooKeeper errors
public AtomicValue<byte[]> trySet(final byte[] newValue) throws Exception
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MakeValue makeValue = new MakeValue()
public byte[] makeFrom(byte[] previous)
return newValue;
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
tryWithMutex(result, makeValue);
return result;
* Atomic values are initially set to the equivalent of <code>NULL</code> in a database.
* Use this method to initialize the value. The value will be set if and only iff the node does not exist.
* @param value the initial value to set
* @return true if the value was set, false if the node already existed
* @throws Exception ZooKeeper errors
public boolean initialize(byte[] value) throws Exception
client.create().creatingParentContainersIfNeeded().forPath(path, value);
catch ( KeeperException.NodeExistsException ignore )
// ignore
return false;
return true;
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
tryWithMutex(result, makeValue);
return result;
RuntimeException createCorruptionException(byte[] bytes)
StringBuilder str = new StringBuilder();
boolean first = true;
for ( byte b : bytes )
if ( first )
first = false;
str.append(", ");
str.append("0x").append(Integer.toHexString((b & 0xff)));
return new RuntimeException(String.format("Corrupted data for node \"%s\": %s", path, str.toString()));
private boolean getCurrentValue(MutableAtomicValue<byte[]> result, Stat stat) throws Exception
boolean createIt = false;
result.preValue = client.getData().storingStatIn(stat).forPath(path);
catch ( KeeperException.NoNodeException e )
result.preValue = null;
createIt = true;
return createIt;
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
long startMs = System.currentTimeMillis();
int retryCount = 0;
if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
boolean done = false;
while ( !done )
if ( tryOnce(result, makeValue) )
result.succeeded = true;
done = true;
if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
done = true;
result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
long startMs = System.currentTimeMillis();
int retryCount = 0;
boolean done = false;
while ( !done )
if ( tryOnce(result, makeValue) )
result.succeeded = true;
done = true;
if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
done = true;
result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
Stat stat = new Stat();
boolean createIt = getCurrentValue(result, stat);
boolean success = false;
byte[] newValue = makeValue.makeFrom(result.preValue);
if ( createIt )
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.postValue = Arrays.copyOf(newValue, newValue.length);
success = true;
catch ( KeeperException.NodeExistsException e )
// do Retry
catch ( KeeperException.BadVersionException e )
// do Retry
catch ( KeeperException.NoNodeException e )
// do Retry
return success;