blob: 2ccc17307a4b9e1053f30eea48052f12144fc9b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.curator.RetryLoop;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder2;
import org.apache.curator.utils.InternalACLProvider;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFrameworkImpl client;
private final ProtectedMode protectedMode = new ProtectedMode();
private CreateMode createMode;
private Backgrounding backgrounding;
private boolean createParentsIfNeeded;
private boolean createParentsAsContainers;
private boolean compress;
private boolean setDataIfExists;
private int setDataIfExistsVersion = -1;
private ACLing acling;
private Stat storingStat;
private long ttl;
@VisibleForTesting
boolean failNextCreateForTesting = false;
@VisibleForTesting
static final String PROTECTED_PREFIX = "_c_";
CreateBuilderImpl(CuratorFrameworkImpl client)
{
this.client = client;
createMode = CreateMode.PERSISTENT;
backgrounding = new Backgrounding();
acling = new ACLing(client.getAclProvider());
createParentsIfNeeded = false;
createParentsAsContainers = false;
compress = false;
setDataIfExists = false;
storingStat = null;
ttl = -1;
}
public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat, long ttl)
{
this.client = client;
this.createMode = createMode;
this.backgrounding = backgrounding;
this.createParentsIfNeeded = createParentsIfNeeded;
this.createParentsAsContainers = createParentsAsContainers;
this.compress = compress;
this.setDataIfExists = setDataIfExists;
this.acling = new ACLing(client.getAclProvider(), aclList);
this.storingStat = storingStat;
this.ttl = ttl;
if ( doProtected )
{
protectedMode.setProtectedMode();
}
}
public void setSetDataIfExistsVersion(int version)
{
this.setDataIfExistsVersion = version;
}
@Override
public CreateBuilder2 orSetData()
{
return orSetData(-1);
}
@Override
public CreateBuilder2 orSetData(int version)
{
setDataIfExists = true;
setDataIfExistsVersion = version;
return this;
}
@Override
public CreateBuilderMain withTtl(long ttl)
{
this.ttl = ttl;
return this;
}
<T> TransactionCreateBuilder<T> asTransactionCreateBuilder(final T context, final CuratorMultiTransactionRecord transaction)
{
return new TransactionCreateBuilder<T>()
{
@Override
public PathAndBytesable<T> withACL(List<ACL> aclList)
{
return withACL(aclList, false);
}
@Override
public PathAndBytesable<T> withACL(List<ACL> aclList, boolean applyToParents)
{
CreateBuilderImpl.this.withACL(aclList, applyToParents);
return this;
}
@Override
public TransactionCreateBuilder2<T> withTtl(long ttl)
{
CreateBuilderImpl.this.withTtl(ttl);
return this;
}
@Override
public ACLPathAndBytesable<T> withMode(CreateMode mode)
{
CreateBuilderImpl.this.withMode(mode);
return this;
}
@Override
public ACLCreateModePathAndBytesable<T> compressed()
{
CreateBuilderImpl.this.compressed();
return this;
}
@Override
public T forPath(String path) throws Exception
{
return forPath(path, client.getDefaultData());
}
@Override
public T forPath(String path, byte[] data) throws Exception
{
if ( compress )
{
data = client.getCompressionProvider().compress(path, data);
}
String fixedPath = client.fixForNamespace(path);
transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
return context;
}
};
}
@Override
public CreateBackgroundModeStatACLable compressed()
{
compress = true;
return new CreateBackgroundModeStatACLable()
{
@Override
public CreateBackgroundModeACLable storingStatIn(Stat stat)
{
storingStat = stat;
return asCreateBackgroundModeACLable();
}
@Override
public ACLCreateModePathAndBytesable<String> creatingParentsIfNeeded()
{
createParentsIfNeeded = true;
return asACLCreateModePathAndBytesable();
}
@Override
public ACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded()
{
setCreateParentsAsContainers();
return creatingParentsIfNeeded();
}
@Override
public ACLPathAndBytesable<String> withProtectedEphemeralSequential()
{
return CreateBuilderImpl.this.withProtectedEphemeralSequential();
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList)
{
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context)
{
return CreateBuilderImpl.this.inBackground(callback, context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor)
{
return CreateBuilderImpl.this.inBackground(callback, context, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground()
{
return CreateBuilderImpl.this.inBackground();
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context)
{
return CreateBuilderImpl.this.inBackground(context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback)
{
return CreateBuilderImpl.this.inBackground(callback);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor)
{
return CreateBuilderImpl.this.inBackground(callback, executor);
}
@Override
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)
{
return CreateBuilderImpl.this.withMode(mode);
}
@Override
public String forPath(String path, byte[] data) throws Exception
{
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception
{
return CreateBuilderImpl.this.forPath(path);
}
};
}
@Override
public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList)
{
return withACL(aclList, false);
}
@Override
public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
acling = new ACLing(client.getAclProvider(), aclList, applyToParents);
return new ACLBackgroundPathAndBytesable<String>()
{
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList)
{
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground()
{
return CreateBuilderImpl.this.inBackground();
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context)
{
return CreateBuilderImpl.this.inBackground(callback, context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor)
{
return CreateBuilderImpl.this.inBackground(callback, context, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context)
{
return CreateBuilderImpl.this.inBackground(context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback)
{
return CreateBuilderImpl.this.inBackground(callback);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor)
{
return CreateBuilderImpl.this.inBackground(callback, executor);
}
@Override
public String forPath(String path, byte[] data) throws Exception
{
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception
{
return CreateBuilderImpl.this.forPath(path);
}
};
}
@Override
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentContainersIfNeeded()
{
setCreateParentsAsContainers();
return creatingParentsIfNeeded();
}
private void setCreateParentsAsContainers()
{
if ( client.useContainerParentsIfAvailable() )
{
createParentsAsContainers = true;
}
}
@Override
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()
{
createParentsIfNeeded = true;
return new ProtectACLCreateModeStatPathAndBytesable<String>()
{
@Override
public ACLCreateModeBackgroundPathAndBytesable<String> withProtection()
{
return CreateBuilderImpl.this.withProtection();
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList)
{
return withACL(aclList, false);
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground()
{
return CreateBuilderImpl.this.inBackground();
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context)
{
return CreateBuilderImpl.this.inBackground(context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback)
{
return CreateBuilderImpl.this.inBackground(callback);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context)
{
return CreateBuilderImpl.this.inBackground(callback, context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor)
{
return CreateBuilderImpl.this.inBackground(callback, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor)
{
return CreateBuilderImpl.this.inBackground(callback, context, executor);
}
@Override
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)
{
return CreateBuilderImpl.this.withMode(mode);
}
@Override
public String forPath(String path, byte[] data) throws Exception
{
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception
{
return CreateBuilderImpl.this.forPath(path);
}
@Override
public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat)
{
storingStat = stat;
return CreateBuilderImpl.this;
}
};
}
@Override
public ACLCreateModeStatBackgroundPathAndBytesable<String> withProtection()
{
protectedMode.setProtectedMode();
return asACLCreateModeStatBackgroundPathAndBytesable();
}
@Override
public ACLPathAndBytesable<String> withProtectedEphemeralSequential()
{
protectedMode.setProtectedMode();
createMode = CreateMode.EPHEMERAL_SEQUENTIAL;
return new ACLPathAndBytesable<String>()
{
@Override
public PathAndBytesable<String> withACL(List<ACL> aclList)
{
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public PathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public String forPath(String path, byte[] data) throws Exception
{
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception
{
return CreateBuilderImpl.this.forPath(path);
}
};
}
@Override
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)
{
createMode = mode;
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context)
{
backgrounding = new Backgrounding(callback, context);
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor)
{
backgrounding = new Backgrounding(client, callback, context, executor);
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback)
{
backgrounding = new Backgrounding(callback);
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor)
{
backgrounding = new Backgrounding(client, callback, executor);
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground()
{
backgrounding = new Backgrounding(true);
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context)
{
backgrounding = new Backgrounding(context);
return this;
}
@Override
public PathAndBytesable<String> withUnhandledErrorListener(UnhandledErrorListener listener)
{
backgrounding = new Backgrounding(backgrounding, listener);
return this;
}
@Override
public String forPath(String path) throws Exception
{
return forPath(path, client.getDefaultData());
}
@Override
public String forPath(final String givenPath, byte[] data) throws Exception
{
if ( compress )
{
data = client.getCompressionProvider().compress(givenPath, data);
}
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
List<ACL> aclList = acling.getAclList(adjustedPath);
client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null;
if ( backgrounding.inBackground() )
{
pathInBackground(adjustedPath, data, givenPath);
}
else
{
String path = protectedPathInForeground(adjustedPath, data, aclList);
returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception
{
try
{
return pathInForeground(adjustedPath, data, aclList);
}
catch ( Exception e)
{
ThreadUtils.checkInterrupted(e);
if ( ( e instanceof KeeperException.ConnectionLossException ||
!( e instanceof KeeperException )) && protectedMode.doProtected() )
{
/*
* CURATOR-45 + CURATOR-79: we don't know if the create operation was successful or not,
* register the znode to be sure it is deleted later.
*/
new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(adjustedPath).getPath(), protectedMode.protectedId()).execute();
/*
* The current UUID is scheduled to be deleted, it is not safe to use it again.
* If this builder is used again later create a new UUID
*/
protectedMode.resetProtectedId();
}
throw e;
}
}
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
final byte[] data = operationAndData.getData().getData();
AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
{
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat)
{
trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();
if ( (stat != null) && (storingStat != null) )
{
DataTree.copyStat(stat, storingStat);
}
if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
{
backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, acling.getACLProviderForParents(), createParentsAsContainers);
}
else if ( (rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists )
{
backgroundSetData(client, operationAndData, operationAndData.getData().getPath(), backgrounding);
}
else
{
sendBackgroundResponse(rc, path, ctx, name, stat, operationAndData);
}
}
};
client.getZooKeeper().create
(
operationAndData.getData().getPath(),
data,
acling.getAclList(operationAndData.getData().getPath()),
createMode,
callback,
backgrounding.getContext(),
ttl
);
}
catch ( Throwable e )
{
backgrounding.checkError(e, null);
}
}
@Override
public CreateProtectACLCreateModePathAndBytesable<String> storingStatIn(Stat stat) {
storingStat = stat;
return new CreateProtectACLCreateModePathAndBytesable<String>() {
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList)
{
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground() {
return CreateBuilderImpl.this.inBackground();
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context) {
return CreateBuilderImpl.this.inBackground(context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) {
return CreateBuilderImpl.this.inBackground(callback);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) {
return CreateBuilderImpl.this.inBackground(callback, context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) {
return CreateBuilderImpl.this.inBackground(callback, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context,
Executor executor) {
return CreateBuilderImpl.this.inBackground(callback, context, executor);
}
@Override
public String forPath(String path, byte[] data) throws Exception {
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception {
return CreateBuilderImpl.this.forPath(path);
}
@Override
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode) {
return CreateBuilderImpl.this.withMode(mode);
}
@Override
public ACLCreateModeBackgroundPathAndBytesable<String> withProtection() {
return CreateBuilderImpl.this.withProtection();
}
@Override
public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded() {
return CreateBuilderImpl.this.creatingParentsIfNeeded();
}
@Override
public ProtectACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded() {
return CreateBuilderImpl.this.creatingParentContainersIfNeeded();
}
};
}
private static String getProtectedPrefix(String protectedId)
{
return PROTECTED_PREFIX + protectedId + "-";
}
static <T> void backgroundCreateParentsThenNode(final CuratorFrameworkImpl client, final OperationAndData<T> mainOperationAndData, final String path, Backgrounding backgrounding, final InternalACLProvider aclProvider, final boolean createParentsAsContainers)
{
BackgroundOperation<T> operation = new BackgroundOperation<T>()
{
@Override
public void performBackgroundOperation(OperationAndData<T> dummy) throws Exception
{
try
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, aclProvider, createParentsAsContainers);
}
catch ( KeeperException e )
{
if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(e) )
{
throw e;
}
// otherwise safe to ignore as it will get retried
}
client.queueOperation(mainOperationAndData);
}
};
OperationAndData<T> parentOperation = new OperationAndData<>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null);
client.queueOperation(parentOperation);
}
private void backgroundSetData(final CuratorFrameworkImpl client, final OperationAndData<PathAndBytes> mainOperationAndData, final String path, final Backgrounding backgrounding)
{
final AsyncCallback.StatCallback statCallback = new AsyncCallback.StatCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, Stat stat)
{
if ( rc == KeeperException.Code.NONODE.intValue() )
{
client.queueOperation(mainOperationAndData); // try to create it again
}
else
{
sendBackgroundResponse(rc, path, ctx, path, stat, mainOperationAndData);
}
}
};
BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
{
@Override
public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception
{
try
{
client.getZooKeeper().setData(path, mainOperationAndData.getData().getData(), setDataIfExistsVersion, statCallback, backgrounding.getContext());
}
catch ( KeeperException e )
{
// ignore
}
}
};
client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
}
private void sendBackgroundResponse(int rc, String path, Object ctx, String name, Stat stat, OperationAndData<PathAndBytes> operationAndData)
{
path = client.unfixForNamespace(path);
name = client.unfixForNamespace(name);
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.CREATE, rc, path, name, ctx, stat, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
private ACLCreateModePathAndBytesable<String> asACLCreateModePathAndBytesable()
{
return new ACLCreateModePathAndBytesable<String>()
{
@Override
public PathAndBytesable<String> withACL(List<ACL> aclList)
{
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public PathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
CreateBuilderImpl.this.withACL(aclList, applyToParents);
return this;
}
@Override
public ACLPathAndBytesable<String> withMode(CreateMode mode)
{
createMode = mode;
return new ACLPathAndBytesable<String>()
{
@Override
public PathAndBytesable<String> withACL(List<ACL> aclList)
{
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public PathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public String forPath(String path, byte[] data) throws Exception
{
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception
{
return CreateBuilderImpl.this.forPath(path);
}
};
}
@Override
public String forPath(String path, byte[] data) throws Exception
{
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public String forPath(String path) throws Exception
{
return CreateBuilderImpl.this.forPath(path);
}
};
}
private CreateBackgroundModeACLable asCreateBackgroundModeACLable()
{
return new CreateBackgroundModeACLable() {
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList) {
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
return CreateBuilderImpl.this.withACL(aclList, applyToParents);
}
@Override
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode) {
return CreateBuilderImpl.this.withMode(mode);
}
@Override
public String forPath(String path) throws Exception {
return CreateBuilderImpl.this.forPath(path);
}
@Override
public String forPath(String path, byte[] data) throws Exception {
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) {
return CreateBuilderImpl.this.inBackground(callback, context, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) {
return CreateBuilderImpl.this.inBackground(callback, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) {
return CreateBuilderImpl.this.inBackground(callback, context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) {
return CreateBuilderImpl.this.inBackground(callback);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context) {
return CreateBuilderImpl.this.inBackground(context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground() {
return CreateBuilderImpl.this.inBackground();
}
@Override
public ACLPathAndBytesable<String> withProtectedEphemeralSequential() {
return CreateBuilderImpl.this.withProtectedEphemeralSequential();
}
@Override
public ACLCreateModePathAndBytesable<String> creatingParentsIfNeeded() {
createParentsIfNeeded = true;
return asACLCreateModePathAndBytesable();
}
@Override
public ACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded() {
setCreateParentsAsContainers();
return asACLCreateModePathAndBytesable();
}
};
}
private ACLCreateModeStatBackgroundPathAndBytesable<String> asACLCreateModeStatBackgroundPathAndBytesable()
{
return new ACLCreateModeStatBackgroundPathAndBytesable<String>()
{
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList) {
return CreateBuilderImpl.this.withACL(aclList);
}
@Override
public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
{
CreateBuilderImpl.this.withACL(aclList, applyToParents);
return this;
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground() {
return CreateBuilderImpl.this.inBackground();
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context, Executor executor) {
return CreateBuilderImpl.this.inBackground(callback, context, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Executor executor) {
return CreateBuilderImpl.this.inBackground(callback, executor);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context) {
return CreateBuilderImpl.this.inBackground(callback, context);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback) {
return CreateBuilderImpl.this.inBackground(callback);
}
@Override
public ErrorListenerPathAndBytesable<String> inBackground(Object context) {
return CreateBuilderImpl.this.inBackground(context);
}
@Override
public String forPath(String path) throws Exception {
return CreateBuilderImpl.this.forPath(path);
}
@Override
public String forPath(String path, byte[] data) throws Exception {
return CreateBuilderImpl.this.forPath(path, data);
}
@Override
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode) {
return CreateBuilderImpl.this.withMode(mode);
}
@Override
public ACLCreateModeBackgroundPathAndBytesable<String> storingStatIn(Stat stat) {
storingStat = stat;
return CreateBuilderImpl.this;
}
};
}
@VisibleForTesting
volatile boolean debugForceFindProtectedNode = false;
private void pathInBackground(final String path, final byte[] data, final String givenPath)
{
final AtomicBoolean firstTime = new AtomicBoolean(true);
OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(),
new OperationAndData.ErrorCallback<PathAndBytes>()
{
public void retriesExhausted(OperationAndData<PathAndBytes> operationAndData)
{
if ( protectedMode.doProtected() )
{
// all retries have failed, findProtectedNodeInForeground(..) included, schedule a clean up
new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(path).getPath(), protectedMode.protectedId()).execute();
// assign a new id if this builder is used again later
protectedMode.resetProtectedId();
}
}
},
backgrounding.getContext(),
null)
{
@Override
void callPerformBackgroundOperation() throws Exception
{
boolean callSuper = true;
boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
protectedMode.checkSetSessionId(client, createMode);
if ( !localFirstTime && protectedMode.doProtected() )
{
debugForceFindProtectedNode = false;
String createdPath = null;
try
{
createdPath = findProtectedNodeInForeground(path);
}
catch ( KeeperException.ConnectionLossException e )
{
sendBackgroundResponse(KeeperException.Code.CONNECTIONLOSS.intValue(), path, backgrounding.getContext(), null, null, this);
callSuper = false;
}
if ( createdPath != null )
{
try
{
sendBackgroundResponse(KeeperException.Code.OK.intValue(), createdPath, backgrounding.getContext(), createdPath, null, this);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
client.logError("Processing protected create for path: " + givenPath, e);
}
callSuper = false;
}
}
if ( failNextCreateForTesting )
{
failNextCreateForTesting = false;
pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client
throw new KeeperException.ConnectionLossException();
}
if ( callSuper )
{
super.callPerformBackgroundOperation();
}
}
};
client.processBackgroundOperation(operationAndData, null);
}
private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Foreground");
final AtomicBoolean firstTime = new AtomicBoolean(true);
String returnPath = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<String>()
{
@Override
public String call() throws Exception
{
boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
protectedMode.checkSetSessionId(client, createMode);
String createdPath = null;
if ( !localFirstTime && protectedMode.doProtected() )
{
debugForceFindProtectedNode = false;
createdPath = findProtectedNodeInForeground(path);
}
if ( createdPath == null )
{
try
{
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
}
catch ( KeeperException.NoNodeException e )
{
if ( createParentsIfNeeded )
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
}
else
{
throw e;
}
}
catch ( KeeperException.NodeExistsException e )
{
if ( setDataIfExists )
{
Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
if(storingStat != null)
{
DataTree.copyStat(setStat, storingStat);
}
createdPath = path;
}
else
{
throw e;
}
}
}
if ( failNextCreateForTesting )
{
failNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
return createdPath;
}
}
);
trace.setRequestBytesLength(data).setPath(path).commit();
return returnPath;
}
private String findProtectedNodeInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-findProtectedNodeInForeground");
String returnPath = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<String>()
{
@Override
public String call() throws Exception
{
String foundNode = null;
try
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
foundNode = findNode(children, pathAndNode.getPath(), protectedMode.protectedId());
log.debug("Protected mode findNode result: {}", foundNode);
foundNode = protectedMode.validateFoundNode(client, createMode, foundNode);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
return foundNode;
}
}
);
trace.setPath(path).commit();
return returnPath;
}
@VisibleForTesting
String adjustPath(String path) throws Exception
{
if ( protectedMode.doProtected() )
{
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
String name = getProtectedPrefix(protectedMode.protectedId()) + pathAndNode.getNode();
path = ZKPaths.makePath(pathAndNode.getPath(), name);
}
return path;
}
/**
* Attempt to find the znode that matches the given path and protected id
*
* @param children a list of candidates znodes
* @param path the path
* @param protectedId the protected id
* @return the absolute path of the znode or <code>null</code> if it is not found
*/
static String findNode(final List<String> children, final String path, final String protectedId)
{
final String protectedPrefix = getProtectedPrefix(protectedId);
String foundNode = Iterables.find
(
children,
new Predicate<String>()
{
@Override
public boolean apply(String node)
{
return node.startsWith(protectedPrefix);
}
},
null
);
if ( foundNode != null )
{
foundNode = ZKPaths.makePath(path, foundNode);
}
return foundNode;
}
}