Merge 'master' into CURATOR-3.0
diff --git a/curator-client/pom.xml b/curator-client/pom.xml
index 64a3393..7bb437c 100644
--- a/curator-client/pom.xml
+++ b/curator-client/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-client</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Curator Client</name>
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 6b66e82..065ebef 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -150,7 +150,8 @@
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
(rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
- (rc == KeeperException.Code.SESSIONEXPIRED.intValue());
+ (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
+ (rc == KeeperException.Code.NEWCONFIGNOQUORUM.intValue());
}
/**
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
new file mode 100644
index 0000000..8f963cd
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble;
+
+public interface EnsembleListener {
+
+ void connectionStringUpdated(String connectionString);
+}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
new file mode 100644
index 0000000..70b755f
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble.dynamic;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.ensemble.EnsembleProvider;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DynamicEnsembleProvider implements EnsembleProvider, EnsembleListener {
+
+ private final AtomicReference<String> connectionString = new AtomicReference<String>();
+
+ /**
+ * The connection string to use
+ *
+ * @param connectionString connection string
+ */
+ public DynamicEnsembleProvider(String connectionString)
+ {
+ this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null"));
+ }
+
+ @Override
+ public void start() throws Exception {
+ // NOP
+ }
+
+ @Override
+ public String getConnectionString() {
+ return connectionString.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // NOP
+ }
+
+ @Override
+ public void connectionStringUpdated(String connectionString) {
+ this.connectionString.set(connectionString);
+ }
+}
diff --git a/curator-examples/pom.xml b/curator-examples/pom.xml
index 406cbac..536f727 100644
--- a/curator-examples/pom.xml
+++ b/curator-examples/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-examples</artifactId>
diff --git a/curator-examples/src/main/java/framework/TransactionExamples.java b/curator-examples/src/main/java/framework/TransactionExamples.java
index f559b5a..7ff8064 100644
--- a/curator-examples/src/main/java/framework/TransactionExamples.java
+++ b/curator-examples/src/main/java/framework/TransactionExamples.java
@@ -19,25 +19,24 @@
package framework;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
import java.util.Collection;
public class TransactionExamples
{
public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception
{
- // this example shows how to use ZooKeeper's new transactions
+ // this example shows how to use ZooKeeper's transactions
- Collection<CuratorTransactionResult> results = client.inTransaction()
- .create().forPath("/a/path", "some data".getBytes())
- .and()
- .setData().forPath("/another/path", "other data".getBytes())
- .and()
- .delete().forPath("/yet/another/path")
- .and()
- .commit(); // IMPORTANT! The transaction is not submitted until commit() is called
+ CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes());
+ CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes());
+ CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path");
+
+ Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp);
for ( CuratorTransactionResult result : results )
{
@@ -46,33 +45,4 @@
return results;
}
-
- /*
- These next four methods show how to use Curator's transaction APIs in a more
- traditional - one-at-a-time - manner
- */
-
- public static CuratorTransaction startTransaction(CuratorFramework client)
- {
- // start the transaction builder
- return client.inTransaction();
- }
-
- public static CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction) throws Exception
- {
- // add a create operation
- return transaction.create().forPath("/a/path", "some data".getBytes()).and();
- }
-
- public static CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction) throws Exception
- {
- // add a delete operation
- return transaction.delete().forPath("/another/path").and();
- }
-
- public static void commitTransaction(CuratorTransactionFinal transaction) throws Exception
- {
- // commit the transaction
- transaction.commit();
- }
}
diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml
index 212678d..a7df78c 100644
--- a/curator-framework/pom.xml
+++ b/curator-framework/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-framework</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Curator Framework</name>
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index b9d67b9..9239ac4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -21,7 +21,10 @@
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
@@ -122,11 +125,42 @@
public SetACLBuilder setACL();
/**
+ * Start a reconfig builder
+ *
+ * @return builder object
+ */
+ public ReconfigBuilder reconfig();
+
+ /**
+ * Start a getConfig builder
+ *
+ * @return builder object
+ */
+ public GetConfigBuilder getConfig();
+
+ /**
+ * Start a transaction builder
+ *
+ * @return builder object
+ * @deprecated use {@link #transaction()} instead
+ */
+ public CuratorTransaction inTransaction();
+
+ /**
* Start a transaction builder
*
* @return builder object
*/
- public CuratorTransaction inTransaction();
+ public CuratorMultiTransaction transaction();
+
+ /**
+ * Allocate an operation that can be used with {@link #transaction()}.
+ * NOTE: {@link CuratorOp} instances created by this builder are
+ * reusable.
+ *
+ * @return operation builder
+ */
+ public TransactionOp transactionOp();
/**
* Perform a sync on the given path - syncs are always in the background
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java
new file mode 100644
index 0000000..16f78a2
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddStatConfigEnsembleable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An non-incremental reconfiguration builder.
+ * This builder has access only to the non-incremental reconfiguration methods withMembers, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface AddStatConfigEnsembleable extends
+ Addable<StatConfigEnsembleable>,
+ StatConfigEnsembleable
+{
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Addable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Addable.java
new file mode 100644
index 0000000..e908f1e
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Addable.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+import java.util.List;
+
+public interface Addable<T>
+{
+ /**
+ * Sets one or more members that are meant to be part of the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param server The server to add as a member of the ensemble.
+ * @return this
+ */
+ T adding(String... server);
+
+ /**
+ * Sets one or more members that are meant to be part of the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The server to add as a member of the ensemble.
+ * @return this
+ */
+ T adding(List<String> servers);
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java
new file mode 100644
index 0000000..fc7fd57
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AsyncReconfigurable {
+
+ /**
+ * Sets the configuration version to use.
+ * @param config The version of the configuration.
+ * @throws Exception
+ */
+ void fromConfig(long config) throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java
new file mode 100644
index 0000000..77c4e96
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundStatable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface BackgroundStatable<T> extends
+ Backgroundable<T>,
+ Statable<T> {
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java
new file mode 100644
index 0000000..2bc0494
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Configurable.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface Configurable
+{
+
+ /**
+ * Sets the configuration version to use.
+ * @param config The version of the configuration.
+ * @throws Exception
+ */
+ StatEnsembleable<byte[]> fromConfig(long config) throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEvent.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEvent.java
index 2a5408c..673613c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEvent.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEvent.java
@@ -18,6 +18,8 @@
*/
package org.apache.curator.framework.api;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -80,6 +82,11 @@
public List<ACL> getACLList();
/**
+ * @return any operation results or null
+ */
+ public List<CuratorTransactionResult> getOpResults();
+
+ /**
* If {@link #getType()} returns {@link CuratorEventType#WATCHED} this will
* return the WatchedEvent
*
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 684d11b..5a2dc56 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -69,6 +69,21 @@
SET_ACL,
/**
+ * Corresponds to {@link CuratorFramework#transaction()}
+ */
+ TRANSACTION,
+
+ /**
+ * Corresponds to {@link CuratorFramework#getConfig()}
+ */
+ GET_CONFIG,
+
+ /**
+ * Corresponds to {@link CuratorFramework#reconfig()}
+ */
+ RECONFIG,
+
+ /**
* Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
*/
WATCHED,
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java
new file mode 100644
index 0000000..75ded65
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+
+public interface DataCallbackable<T> {
+
+ /**
+ * Passes a callback and a context object to the config/reconfig command.
+ * @param callback The async callback to use.
+ * @param ctx An object that will be passed to the callback.
+ * @return this
+ */
+ T usingDataCallback(DataCallback callback, Object ctx);
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java
new file mode 100644
index 0000000..c8a82fe
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Ensembleable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface Ensembleable<T> {
+
+ T forEnsemble() throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java
new file mode 100644
index 0000000..c42e4cb
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface GetConfigBuilder extends
+ Watchable<BackgroundStatable<Ensembleable<byte[]>>>,
+ BackgroundStatable<Ensembleable<byte[]>>,
+ Ensembleable<byte[]>
+{
+}
+
+
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java
new file mode 100644
index 0000000..0ad6426
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods join and leave, so that we prevent
+ * mixing concepts that can't be used together.
+ * @param <T>
+ */
+public interface IncrementalReconfigBuilder<T> extends
+ Joinable<IncrementalReconfigBuilder<T>>,
+ Leaveable<IncrementalReconfigBuilder<T>>,
+ DataCallbackable<AsyncReconfigurable>,
+ Statable<SyncReconfigurable> {
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java
new file mode 100644
index 0000000..a905dd1
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinAddStatConfigEnsembleable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods joining and leaving, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface JoinAddStatConfigEnsembleable extends
+ Joinable<AddStatConfigEnsembleable>,
+ Addable<JoinStatConfigurable>,
+ StatConfigEnsembleable
+{
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java
new file mode 100644
index 0000000..9642297
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinLeaveStatConfigEnsembleable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods joining and leaving, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface JoinLeaveStatConfigEnsembleable extends
+ Joinable<LeaveStatConfigEnsembleable>,
+ Leaveable<JoinStatConfigEnsembleable>,
+ StatConfigEnsembleable
+{
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java
new file mode 100644
index 0000000..5fe7a8c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods joining and leaving, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface JoinStatConfigEnsembleable extends
+ Joinable<StatConfigEnsembleable>,
+ StatConfigEnsembleable
+{
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java
new file mode 100644
index 0000000..ef17ef4
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigurable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods joining and leaving, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface JoinStatConfigurable extends
+ Joinable<Configurable>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java
new file mode 100644
index 0000000..5cebe4d
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+import java.util.List;
+
+public interface Joinable<T>
+{
+ /**
+ * Adds one or more servers to joining the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param server The server joining.
+ * @return this
+ */
+ T joining(String... server);
+
+ /**
+ * Adds one or more servers to joining the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The servers joining.
+ * @return this
+ */
+ T joining(List<String> servers);
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java
new file mode 100644
index 0000000..7912d45
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveAddStatConfigEnsembleable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An non-incremental reconfiguration builder.
+ * This builder has access only to the non-incremental reconfiguration methods withMembers, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface LeaveAddStatConfigEnsembleable extends
+ Leaveable<AddStatConfigEnsembleable>,
+ Addable<LeaveStatConfigEnsembleable>,
+ StatConfigEnsembleable
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java
new file mode 100644
index 0000000..ddad854
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * An incremental reconfiguration builder.
+ * This builder has access only to the incremental reconfiguration methods joining and leaving, so that we prevent
+ * mixing concepts that can't be used together.
+ */
+public interface LeaveStatConfigEnsembleable extends
+ Leaveable<StatConfigEnsembleable>,
+ StatConfigEnsembleable
+{
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java
new file mode 100644
index 0000000..6ec3542
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+import java.util.List;
+
+public interface Leaveable<T>
+{
+ /**
+ * Sets one or more servers to leaving the ensemble.
+ *
+ * @param server The server ids
+ * @return this
+ */
+ T leaving(String... server);
+
+ /**
+ * Sets one or more servers to leaving the ensemble.
+ *
+ * @param servers The server ids
+ * @return this
+ */
+ T leaving(List<String> servers);
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java
new file mode 100644
index 0000000..438abcf
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface ReconfigBuilder extends
+ Joinable<LeaveAddStatConfigEnsembleable>,
+ Leaveable<JoinAddStatConfigEnsembleable>,
+ Addable<JoinLeaveStatConfigEnsembleable>,
+ Backgroundable<ReconfigBuilderMain>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java
new file mode 100644
index 0000000..b86af2d
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface ReconfigBuilderMain extends
+ Joinable<LeaveAddStatConfigEnsembleable>,
+ Leaveable<JoinAddStatConfigEnsembleable>,
+ Addable<JoinLeaveStatConfigEnsembleable>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java
new file mode 100644
index 0000000..4700c8c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigEnsembleable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface StatConfigEnsembleable extends
+ Configurable,
+ StatEnsembleable<byte[]>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java
new file mode 100644
index 0000000..0993b50
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/StatEnsembleable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface StatEnsembleable<T> extends
+ Statable<Ensembleable<T>>,
+ Ensembleable<T>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java
new file mode 100644
index 0000000..bd7b96b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface SyncReconfigurable {
+
+ /**
+ * Sets the configuration version to use.
+ * @param config The version of the configuration.
+ * @return The configuration data.
+ * @throws Exception
+ */
+ byte[] fromConfig(long config) throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorMultiTransaction.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorMultiTransaction.java
new file mode 100644
index 0000000..07bf191
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorMultiTransaction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api.transaction;
+
+import org.apache.curator.framework.api.Backgroundable;
+
+public interface CuratorMultiTransaction extends
+ Backgroundable<CuratorMultiTransactionMain>,
+ CuratorMultiTransactionMain
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorMultiTransactionMain.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorMultiTransactionMain.java
new file mode 100644
index 0000000..2425f5b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorMultiTransactionMain.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api.transaction;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.List;
+
+public interface CuratorMultiTransactionMain
+{
+ /**
+ * Commit the given operations as a single transaction. Create the
+ * operation instances via {@link CuratorFramework#transactionOp()}
+ *
+ * @param operations operations that make up the transaction.
+ * @return result details for foreground operations or <code>null</code> for background operations
+ * @throws Exception errors
+ */
+ List<CuratorTransactionResult> forOperations(CuratorOp... operations) throws Exception;
+
+ /**
+ * Commit the given operations as a single transaction. Create the
+ * operation instances via {@link CuratorFramework#transactionOp()}
+ *
+ * @param operations operations that make up the transaction.
+ * @return result details for foreground operations or <code>null</code> for background operations
+ * @throws Exception errors
+ */
+ List<CuratorTransactionResult> forOperations(List<CuratorOp> operations) throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorOp.java
new file mode 100644
index 0000000..23bc76c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorOp.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api.transaction;
+
+import org.apache.zookeeper.Op;
+
+/**
+ * Internal representation of a transaction operation
+ */
+public interface CuratorOp
+{
+ Op get();
+
+ TypeAndPath getTypeAndPath();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransaction.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransaction.java
index 3901abf..5d60b5c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransaction.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransaction.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.framework.api.transaction;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.ZooKeeper;
/**
@@ -49,6 +50,8 @@
* <b>Important:</b> the operations are not submitted until
* {@link CuratorTransactionFinal#commit()} is called.
* </p>
+ *
+ * @deprecated Use {@link CuratorFramework#transaction()}
*/
public interface CuratorTransaction
{
@@ -57,26 +60,26 @@
*
* @return builder object
*/
- public TransactionCreateBuilder create();
+ public TransactionCreateBuilder<CuratorTransactionBridge> create();
/**
* Start a delete builder in the transaction
*
* @return builder object
*/
- public TransactionDeleteBuilder delete();
+ public TransactionDeleteBuilder<CuratorTransactionBridge> delete();
/**
* Start a setData builder in the transaction
*
* @return builder object
*/
- public TransactionSetDataBuilder setData();
+ public TransactionSetDataBuilder<CuratorTransactionBridge> setData();
/**
* Start a check builder in the transaction
- *ChildData
+ *
* @return builder object
*/
- public TransactionCheckBuilder check();
+ public TransactionCheckBuilder<CuratorTransactionBridge> check();
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransactionResult.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransactionResult.java
index 03bbca2..8d8dc2d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransactionResult.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/CuratorTransactionResult.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.api.transaction;
import com.google.common.base.Predicate;
@@ -27,9 +28,9 @@
public class CuratorTransactionResult
{
private final OperationType type;
- private final String forPath;
- private final String resultPath;
- private final Stat resultStat;
+ private final String forPath;
+ private final String resultPath;
+ private final Stat resultStat;
/**
* Utility that can be passed to Google Guava to find a particular result. E.g.
@@ -41,7 +42,7 @@
* @param forPath path
* @return predicate
*/
- public static Predicate<CuratorTransactionResult> ofTypeAndPath(final OperationType type, final String forPath)
+ public static Predicate<CuratorTransactionResult> ofTypeAndPath(final OperationType type, final String forPath)
{
return new Predicate<CuratorTransactionResult>()
{
@@ -73,7 +74,7 @@
/**
* Returns the path that was passed to the operation when added
- *
+ *
* @return operation input path
*/
public String getForPath()
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/OperationType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/OperationType.java
index 56dcd33..c0aec68 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/OperationType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/OperationType.java
@@ -24,22 +24,22 @@
public enum OperationType
{
/**
- * {@link CuratorTransaction#create()}
+ * {@link TransactionOp#create()}
*/
CREATE,
/**
- * {@link CuratorTransaction#delete()}
+ * {@link TransactionOp#delete()}
*/
DELETE,
/**
- * {@link CuratorTransaction#setData()}
+ * {@link TransactionOp#setData()}
*/
SET_DATA,
/**
- * {@link CuratorTransaction#check()}
+ * {@link TransactionOp#check()}
*/
CHECK
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCheckBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCheckBuilder.java
index 2bc13d1..6de675c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCheckBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCheckBuilder.java
@@ -21,8 +21,8 @@
import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.Versionable;
-public interface TransactionCheckBuilder extends
- Pathable<CuratorTransactionBridge>,
- Versionable<Pathable<CuratorTransactionBridge>>
+public interface TransactionCheckBuilder<T> extends
+ Pathable<T>,
+ Versionable<Pathable<T>>
{
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCreateBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCreateBuilder.java
index 6ac3069..cba0cba 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCreateBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionCreateBuilder.java
@@ -23,10 +23,10 @@
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.PathAndBytesable;
-public interface TransactionCreateBuilder extends
- PathAndBytesable<CuratorTransactionBridge>,
- CreateModable<ACLPathAndBytesable<CuratorTransactionBridge>>,
- ACLPathAndBytesable<CuratorTransactionBridge>,
- Compressible<ACLPathAndBytesable<CuratorTransactionBridge>>
+public interface TransactionCreateBuilder<T> extends
+ PathAndBytesable<T>,
+ CreateModable<ACLPathAndBytesable<T>>,
+ ACLPathAndBytesable<T>,
+ Compressible<ACLPathAndBytesable<T>>
{
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionDeleteBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionDeleteBuilder.java
index e165394..d977290 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionDeleteBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionDeleteBuilder.java
@@ -21,8 +21,8 @@
import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.Versionable;
-public interface TransactionDeleteBuilder extends
- Pathable<CuratorTransactionBridge>,
- Versionable<Pathable<CuratorTransactionBridge>>
+public interface TransactionDeleteBuilder<T> extends
+ Pathable<T>,
+ Versionable<Pathable<T>>
{
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionOp.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionOp.java
new file mode 100644
index 0000000..84808a1
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionOp.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api.transaction;
+
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Builds operations that can be committed as a transaction
+ * via {@link CuratorFramework#transaction()}
+ */
+public interface TransactionOp
+{
+ /**
+ * Start a create builder in the transaction
+ *
+ * @return builder object
+ */
+ TransactionCreateBuilder<CuratorOp> create();
+
+ /**
+ * Start a delete builder in the transaction
+ *
+ * @return builder object
+ */
+ TransactionDeleteBuilder<CuratorOp> delete();
+
+ /**
+ * Start a setData builder in the transaction
+ *
+ * @return builder object
+ */
+ TransactionSetDataBuilder<CuratorOp> setData();
+
+ /**
+ * Start a check builder in the transaction
+ *
+ * @return builder object
+ */
+ TransactionCheckBuilder<CuratorOp> check();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionSetDataBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionSetDataBuilder.java
index 777537a..2d4d255 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionSetDataBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TransactionSetDataBuilder.java
@@ -22,9 +22,9 @@
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.framework.api.Versionable;
-public interface TransactionSetDataBuilder extends
- PathAndBytesable<CuratorTransactionBridge>,
- Versionable<PathAndBytesable<CuratorTransactionBridge>>,
- Compressible<PathAndBytesable<CuratorTransactionBridge>>
+public interface TransactionSetDataBuilder<T> extends
+ PathAndBytesable<T>,
+ Versionable<PathAndBytesable<T>>,
+ Compressible<PathAndBytesable<T>>
{
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TypeAndPath.java b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TypeAndPath.java
new file mode 100644
index 0000000..b1cea95
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/transaction/TypeAndPath.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api.transaction;
+
+public class TypeAndPath
+{
+ private final OperationType type;
+ private final String forPath;
+
+ public TypeAndPath(OperationType type, String forPath)
+ {
+ this.type = type;
+ this.forPath = forPath;
+ }
+
+ public OperationType getType()
+ {
+ return type;
+ }
+
+ public String getForPath()
+ {
+ return forPath;
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
new file mode 100644
index 0000000..375e1f0
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.ensemble;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances.
+ */
+public class EnsembleTracker implements Closeable
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final ListenerContainer<EnsembleListener> listeners = new ListenerContainer<>();
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+ {
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Trying to reset after reconnection", e);
+ }
+ }
+ }
+ };
+
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+ {
+ reset();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ processBackgroundResult(event);
+ }
+ };
+
+ public EnsembleTracker(CuratorFramework client)
+ {
+ this.client = client;
+ }
+
+ public void start() throws Exception
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ reset();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ listeners.clear();
+ }
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ }
+
+ /**
+ * Return the ensemble listenable
+ *
+ * @return listenable
+ */
+ public ListenerContainer<EnsembleListener> getListenable()
+ {
+ Preconditions.checkState(state.get() != State.CLOSED, "Closed");
+
+ return listeners;
+ }
+
+ private void reset() throws Exception
+ {
+ client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+ }
+
+ private void processBackgroundResult(CuratorEvent event) throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case GET_CONFIG:
+ {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ processConfigData(event.getData());
+ }
+ }
+ }
+ }
+
+ private void processConfigData(byte[] data) throws Exception
+ {
+ Properties properties = new Properties();
+ properties.load(new ByteArrayInputStream(data));
+ QuorumVerifier qv = new QuorumMaj(properties);
+ StringBuilder sb = new StringBuilder();
+ for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+ {
+ if ( sb.length() != 0 )
+ {
+ sb.append(",");
+ }
+ sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+ }
+
+ final String connectionString = sb.toString();
+ listeners.forEach
+ (
+ new Function<EnsembleListener, Void>()
+ {
+ @Override
+ public Void apply(EnsembleListener listener)
+ {
+ try
+ {
+ listener.connectionStringUpdated(connectionString);
+ }
+ catch ( Exception e )
+ {
+ log.error("Calling listener", e);
+ }
+ return null;
+ }
+ }
+ );
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java
index af8e458..f0994e3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java
@@ -46,7 +46,7 @@
public void processResult(int rc, String path, Object ctx)
{
trace.commit();
- CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null);
+ CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
},
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 7a4a96f..7184c39 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -26,7 +26,6 @@
import org.apache.curator.TimeTrace;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.*;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
import org.apache.curator.utils.ZKPaths;
@@ -72,39 +71,39 @@
protectedId = null;
}
- TransactionCreateBuilder asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+ <T> TransactionCreateBuilder<T> asTransactionCreateBuilder(final T context, final CuratorMultiTransactionRecord transaction)
{
- return new TransactionCreateBuilder()
+ return new TransactionCreateBuilder<T>()
{
@Override
- public PathAndBytesable<CuratorTransactionBridge> withACL(List<ACL> aclList)
+ public PathAndBytesable<T> withACL(List<ACL> aclList)
{
CreateBuilderImpl.this.withACL(aclList);
return this;
}
@Override
- public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode mode)
+ public ACLPathAndBytesable<T> withMode(CreateMode mode)
{
CreateBuilderImpl.this.withMode(mode);
return this;
}
@Override
- public ACLPathAndBytesable<CuratorTransactionBridge> compressed()
+ public ACLPathAndBytesable<T> compressed()
{
CreateBuilderImpl.this.compressed();
return this;
}
@Override
- public CuratorTransactionBridge forPath(String path) throws Exception
+ public T forPath(String path) throws Exception
{
return forPath(path, client.getDefaultData());
}
@Override
- public CuratorTransactionBridge forPath(String path, byte[] data) throws Exception
+ public T forPath(String path, byte[] data) throws Exception
{
if ( compress )
{
@@ -113,7 +112,7 @@
String fixedPath = client.fixForNamespace(path);
transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
- return curatorTransaction;
+ return context;
}
};
}
@@ -561,7 +560,7 @@
path = client.unfixForNamespace(path);
name = client.unfixForNamespace(name);
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.CREATE, rc, path, name, ctx, null, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.CREATE, rc, path, name, ctx, null, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorEventImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorEventImpl.java
index 929fe6d..4aa125f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorEventImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorEventImpl.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.collect.ImmutableList;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -29,16 +31,17 @@
class CuratorEventImpl implements CuratorEvent
{
- private final CuratorEventType type;
- private final int resultCode;
- private final String path;
- private final String name;
- private final List<String> children;
- private final Object context;
- private final Stat stat;
- private final byte[] data;
- private final WatchedEvent watchedEvent;
- private final List<ACL> aclList;
+ private final CuratorEventType type;
+ private final int resultCode;
+ private final String path;
+ private final String name;
+ private final List<String> children;
+ private final Object context;
+ private final Stat stat;
+ private final byte[] data;
+ private final WatchedEvent watchedEvent;
+ private final List<ACL> aclList;
+ private final List<CuratorTransactionResult> opResults;
@Override
public CuratorEventType getType()
@@ -101,6 +104,12 @@
}
@Override
+ public List<CuratorTransactionResult> getOpResults()
+ {
+ return opResults;
+ }
+
+ @Override
public String toString()
{
return "CuratorEventImpl{" +
@@ -114,20 +123,22 @@
", data=" + Arrays.toString(data) +
", watchedEvent=" + watchedEvent +
", aclList=" + aclList +
+ ", opResults=" + opResults +
'}';
}
- CuratorEventImpl(CuratorFrameworkImpl client, CuratorEventType type, int resultCode, String path, String name, Object context, Stat stat, byte[] data, List<String> children, WatchedEvent watchedEvent, List<ACL> aclList)
+ CuratorEventImpl(CuratorFrameworkImpl client, CuratorEventType type, int resultCode, String path, String name, Object context, Stat stat, byte[] data, List<String> children, WatchedEvent watchedEvent, List<ACL> aclList, List<CuratorTransactionResult> opResults)
{
this.type = type;
this.resultCode = resultCode;
+ this.opResults = (opResults != null) ? ImmutableList.copyOf(opResults) : null;
this.path = client.unfixForNamespace(path);
this.name = name;
this.context = context;
this.stat = stat;
this.data = data;
this.children = children;
- this.watchedEvent = (watchedEvent != null) ? new NamespaceWatchedEvent(client, watchedEvent) : watchedEvent;
+ this.watchedEvent = (watchedEvent != null) ? new NamespaceWatchedEvent(client, watchedEvent) : null;
this.aclList = (aclList != null) ? ImmutableList.copyOf(aclList) : null;
}
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 38ce166..900374b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -31,7 +31,9 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
@@ -104,7 +106,7 @@
@Override
public void process(WatchedEvent watchedEvent)
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
}, builder.getRetryPolicy(), builder.canBeReadOnly());
@@ -286,7 +288,7 @@
@Override
public Void apply(CuratorListener listener)
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
try
{
listener.eventReceived(CuratorFrameworkImpl.this, event);
@@ -408,6 +410,18 @@
}
@Override
+ public ReconfigBuilder reconfig()
+ {
+ return new ReconfigBuilderImpl(this);
+ }
+
+ @Override
+ public GetConfigBuilder getConfig()
+ {
+ return new GetConfigBuilderImpl(this);
+ }
+
+ @Override
public CuratorTransaction inTransaction()
{
Preconditions.checkState(getState() == CuratorFrameworkState.STARTED, "instance must be started before calling this method");
@@ -416,6 +430,22 @@
}
@Override
+ public CuratorMultiTransaction transaction()
+ {
+ Preconditions.checkState(getState() == CuratorFrameworkState.STARTED, "instance must be started before calling this method");
+
+ return new CuratorMultiTransactionImpl(this);
+ }
+
+ @Override
+ public TransactionOp transactionOp()
+ {
+ Preconditions.checkState(getState() == CuratorFrameworkState.STARTED, "instance must be started before calling this method");
+
+ return new TransactionOpImpl(this);
+ }
+
+ @Override
public Listenable<ConnectionStateListener> getConnectionStateListenable()
{
return connectionStateManager.getListenable();
@@ -837,7 +867,7 @@
if ( e instanceof CuratorConnectionLossException )
{
WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
- CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null);
+ CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
if ( checkBackgroundRetry(operationAndData, event) )
{
queueOperation(operationAndData);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
new file mode 100644
index 0000000..577b0d6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransactionMain;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.OpResult;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+public class CuratorMultiTransactionImpl implements
+ CuratorMultiTransaction,
+ CuratorMultiTransactionMain,
+ BackgroundOperation<CuratorMultiTransactionRecord>
+{
+ private final CuratorFrameworkImpl client;
+ private Backgrounding backgrounding = new Backgrounding();
+
+ public CuratorMultiTransactionImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public CuratorMultiTransactionMain inBackground()
+ {
+ backgrounding = new Backgrounding(true);
+ return this;
+ }
+
+ @Override
+ public CuratorMultiTransactionMain inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public CuratorMultiTransactionMain inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public CuratorMultiTransactionMain inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public CuratorMultiTransactionMain inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(callback, executor);
+ return this;
+ }
+
+ @Override
+ public CuratorMultiTransactionMain inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public List<CuratorTransactionResult> forOperations(CuratorOp... operations) throws Exception
+ {
+ List<CuratorOp> ops = (operations != null) ? Arrays.asList(operations) : Lists.<CuratorOp>newArrayList();
+ return forOperations(ops);
+ }
+
+ @Override
+ public List<CuratorTransactionResult> forOperations(List<CuratorOp> operations) throws Exception
+ {
+ operations = Preconditions.checkNotNull(operations, "operations cannot be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations list cannot be empty");
+
+ CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord();
+ for ( CuratorOp curatorOp : operations )
+ {
+ record.add(curatorOp.get(), curatorOp.getTypeAndPath().getType(), curatorOp.getTypeAndPath().getForPath());
+ }
+
+ if ( backgrounding.inBackground() )
+ {
+ client.processBackgroundOperation(new OperationAndData<>(this, record, backgrounding.getCallback(), null, backgrounding.getContext()), null);
+ return null;
+ }
+ else
+ {
+ return forOperationsInForeground(record);
+ }
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception
+ {
+ final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background");
+ AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback()
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<OpResult> opResults)
+ {
+ trace.commit();
+ List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null;
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults);
+ client.processBackgroundOperation(operationAndData, event);
+ }
+ };
+ client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext());
+ }
+
+ private List<CuratorTransactionResult> forOperationsInForeground(final CuratorMultiTransactionRecord record) throws Exception
+ {
+ TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Foreground");
+ List<OpResult> responseData = RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<List<OpResult>>()
+ {
+ @Override
+ public List<OpResult> call() throws Exception
+ {
+ return client.getZooKeeper().multi(record);
+ }
+ }
+ );
+ trace.commit();
+
+ return CuratorTransactionImpl.wrapResults(client, responseData, record);
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 1500d6d..0611df6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import org.apache.curator.framework.api.transaction.OperationType;
+import org.apache.curator.framework.api.transaction.TypeAndPath;
import org.apache.zookeeper.MultiTransactionRecord;
import org.apache.zookeeper.Op;
import java.util.List;
@@ -28,30 +29,18 @@
{
private final List<TypeAndPath> metadata = Lists.newArrayList();
- static class TypeAndPath
- {
- final OperationType type;
- final String forPath;
-
- TypeAndPath(OperationType type, String forPath)
- {
- this.type = type;
- this.forPath = forPath;
- }
- }
-
@Override
public final void add(Op op)
{
throw new UnsupportedOperationException();
}
-
+
void add(Op op, OperationType type, String forPath)
{
super.add(op);
metadata.add(new TypeAndPath(type, forPath));
}
-
+
TypeAndPath getMetadata(int index)
{
return metadata.get(index);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorTransactionImpl.java
index 13ffe82..20ec274 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorTransactionImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorTransactionImpl.java
@@ -16,21 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.api.Pathable;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
-import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
-import org.apache.curator.framework.api.transaction.OperationType;
-import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
-import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
-import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
-import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
+import org.apache.curator.framework.api.transaction.*;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
@@ -43,10 +36,10 @@
class CuratorTransactionImpl implements CuratorTransaction, CuratorTransactionBridge, CuratorTransactionFinal
{
- private final CuratorFrameworkImpl client;
- private final CuratorMultiTransactionRecord transaction;
+ private final CuratorFrameworkImpl client;
+ private final CuratorMultiTransactionRecord transaction;
- private boolean isCommitted = false;
+ private boolean isCommitted = false;
CuratorTransactionImpl(CuratorFrameworkImpl client)
{
@@ -61,49 +54,58 @@
}
@Override
- public TransactionCreateBuilder create()
+ public TransactionCreateBuilder<CuratorTransactionBridge> create()
{
Preconditions.checkState(!isCommitted, "transaction already committed");
- return new CreateBuilderImpl(client).asTransactionCreateBuilder(this, transaction);
+ CuratorTransactionBridge asBridge = this;
+ return new CreateBuilderImpl(client).asTransactionCreateBuilder(asBridge, transaction);
}
@Override
- public TransactionDeleteBuilder delete()
+ public TransactionDeleteBuilder<CuratorTransactionBridge> delete()
{
Preconditions.checkState(!isCommitted, "transaction already committed");
- return new DeleteBuilderImpl(client).asTransactionDeleteBuilder(this, transaction);
+ CuratorTransactionBridge asBridge = this;
+ return new DeleteBuilderImpl(client).asTransactionDeleteBuilder(asBridge, transaction);
}
@Override
- public TransactionSetDataBuilder setData()
+ public TransactionSetDataBuilder<CuratorTransactionBridge> setData()
{
Preconditions.checkState(!isCommitted, "transaction already committed");
- return new SetDataBuilderImpl(client).asTransactionSetDataBuilder(this, transaction);
+ CuratorTransactionBridge asBridge = this;
+ return new SetDataBuilderImpl(client).asTransactionSetDataBuilder(asBridge, transaction);
}
@Override
- public TransactionCheckBuilder check()
+ public TransactionCheckBuilder<CuratorTransactionBridge> check()
{
Preconditions.checkState(!isCommitted, "transaction already committed");
- return new TransactionCheckBuilder()
+ CuratorTransactionBridge asBridge = this;
+ return makeTransactionCheckBuilder(client, asBridge, transaction);
+ }
+
+ static <T> TransactionCheckBuilder<T> makeTransactionCheckBuilder(final CuratorFrameworkImpl client, final T context, final CuratorMultiTransactionRecord transaction)
+ {
+ return new TransactionCheckBuilder<T>()
{
- private int version = -1;
+ private int version = -1;
@Override
- public CuratorTransactionBridge forPath(String path) throws Exception
+ public T forPath(String path) throws Exception
{
- String fixedPath = client.fixForNamespace(path);
+ String fixedPath = client.fixForNamespace(path);
transaction.add(Op.check(fixedPath, version), OperationType.CHECK, path);
- return CuratorTransactionImpl.this;
+ return context;
}
@Override
- public Pathable<CuratorTransactionBridge> withVersion(int version)
+ public Pathable<T> withVersion(int version)
{
this.version = version;
return this;
@@ -118,65 +120,44 @@
isCommitted = true;
final AtomicBoolean firstTime = new AtomicBoolean(true);
- List<OpResult> resultList = RetryLoop.callWithRetry
- (
- client.getZookeeperClient(),
- new Callable<List<OpResult>>()
- {
- @Override
- public List<OpResult> call() throws Exception
+ List<OpResult> resultList = RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<List<OpResult>>()
{
- return doOperation(firstTime);
+ @Override
+ public List<OpResult> call() throws Exception
+ {
+ return doOperation(firstTime);
+ }
}
- }
- );
-
+ );
+
if ( resultList.size() != transaction.metadataSize() )
{
throw new IllegalStateException(String.format("Result size (%d) doesn't match input size (%d)", resultList.size(), transaction.metadataSize()));
}
- ImmutableList.Builder<CuratorTransactionResult> builder = ImmutableList.builder();
+ return wrapResults(client, resultList, transaction);
+ }
+
+ static List<CuratorTransactionResult> wrapResults(CuratorFrameworkImpl client, List<OpResult> resultList, CuratorMultiTransactionRecord transaction)
+ {
+ ImmutableList.Builder<CuratorTransactionResult> builder = ImmutableList.builder();
for ( int i = 0; i < resultList.size(); ++i )
{
- OpResult opResult = resultList.get(i);
- CuratorMultiTransactionRecord.TypeAndPath metadata = transaction.getMetadata(i);
- CuratorTransactionResult curatorResult = makeCuratorResult(opResult, metadata);
+ OpResult opResult = resultList.get(i);
+ TypeAndPath metadata = transaction.getMetadata(i);
+ CuratorTransactionResult curatorResult = makeCuratorResult(client, opResult, metadata);
builder.add(curatorResult);
}
return builder.build();
}
- private List<OpResult> doOperation(AtomicBoolean firstTime) throws Exception
+ static CuratorTransactionResult makeCuratorResult(CuratorFrameworkImpl client, OpResult opResult, TypeAndPath metadata)
{
- boolean localFirstTime = firstTime.getAndSet(false);
- if ( !localFirstTime )
- {
-
- }
-
- List<OpResult> opResults = client.getZooKeeper().multi(transaction);
- if ( opResults.size() > 0 )
- {
- OpResult firstResult = opResults.get(0);
- if ( firstResult.getType() == ZooDefs.OpCode.error )
- {
- OpResult.ErrorResult error = (OpResult.ErrorResult)firstResult;
- KeeperException.Code code = KeeperException.Code.get(error.getErr());
- if ( code == null )
- {
- code = KeeperException.Code.UNIMPLEMENTED;
- }
- throw KeeperException.create(code);
- }
- }
- return opResults;
- }
-
- private CuratorTransactionResult makeCuratorResult(OpResult opResult, CuratorMultiTransactionRecord.TypeAndPath metadata)
- {
- String resultPath = null;
+ String resultPath = null;
Stat resultStat = null;
switch ( opResult.getType() )
{
@@ -188,19 +169,45 @@
case ZooDefs.OpCode.create:
{
- OpResult.CreateResult createResult = (OpResult.CreateResult)opResult;
+ OpResult.CreateResult createResult = (OpResult.CreateResult)opResult;
resultPath = client.unfixForNamespace(createResult.getPath());
break;
}
case ZooDefs.OpCode.setData:
{
- OpResult.SetDataResult setDataResult = (OpResult.SetDataResult)opResult;
+ OpResult.SetDataResult setDataResult = (OpResult.SetDataResult)opResult;
resultStat = setDataResult.getStat();
break;
}
}
- return new CuratorTransactionResult(metadata.type, metadata.forPath, resultPath, resultStat);
+ return new CuratorTransactionResult(metadata.getType(), metadata.getForPath(), resultPath, resultStat);
+ }
+
+ private List<OpResult> doOperation(AtomicBoolean firstTime) throws Exception
+ {
+ boolean localFirstTime = firstTime.getAndSet(false);
+ if ( !localFirstTime )
+ {
+ // TODO
+ }
+
+ List<OpResult> opResults = client.getZooKeeper().multi(transaction);
+ if ( opResults.size() > 0 )
+ {
+ OpResult firstResult = opResults.get(0);
+ if ( firstResult.getType() == ZooDefs.OpCode.error )
+ {
+ OpResult.ErrorResult error = (OpResult.ErrorResult)firstResult;
+ KeeperException.Code code = KeeperException.Code.get(error.getErr());
+ if ( code == null )
+ {
+ code = KeeperException.Code.UNIMPLEMENTED;
+ }
+ throw KeeperException.create(code);
+ }
+ }
+ return opResults;
}
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 9db0013..51641b8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -28,7 +28,6 @@
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.Pathable;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
import org.apache.curator.utils.ZKPaths;
@@ -55,20 +54,20 @@
guaranteed = false;
}
- TransactionDeleteBuilder asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+ <T> TransactionDeleteBuilder<T> asTransactionDeleteBuilder(final T context, final CuratorMultiTransactionRecord transaction)
{
- return new TransactionDeleteBuilder()
+ return new TransactionDeleteBuilder<T>()
{
@Override
- public CuratorTransactionBridge forPath(String path) throws Exception
+ public T forPath(String path) throws Exception
{
String fixedPath = client.fixForNamespace(path);
transaction.add(Op.delete(fixedPath, version), OperationType.DELETE, path);
- return curatorTransaction;
+ return context;
}
@Override
- public Pathable<CuratorTransactionBridge> withVersion(int version)
+ public Pathable<T> withVersion(int version)
{
DeleteBuilderImpl.this.withVersion(version);
return this;
@@ -159,7 +158,7 @@
}
else
{
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index d4a059d..345bcf5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -131,7 +131,7 @@
public void processResult(int rc, String path, Object ctx, Stat stat)
{
trace.commit();
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
new file mode 100644
index 0000000..7a5db69
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TypeAndPath;
+import org.apache.zookeeper.Op;
+
+class ExtractingCuratorOp implements CuratorOp
+{
+ private final CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord();
+
+ CuratorMultiTransactionRecord getRecord()
+ {
+ return record;
+ }
+
+ @Override
+ public TypeAndPath getTypeAndPath()
+ {
+ validate();
+ return record.getMetadata(0);
+ }
+
+ @Override
+ public Op get()
+ {
+ validate();
+ return record.iterator().next();
+ }
+
+ private void validate()
+ {
+ Preconditions.checkArgument(record.size() > 0, "No operation has been added");
+ Preconditions.checkArgument(record.size() == 1, "Multiple operations added");
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
index 250c2c8..f65c933 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
@@ -104,7 +104,7 @@
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat)
{
trace.commit();
- CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl);
+ CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl, null);
client.processBackgroundOperation(operationAndData, event);
}
};
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 16f6d4b..03010ce 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -162,7 +162,7 @@
{
strings = Lists.newArrayList();
}
- CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null);
+ CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
new file mode 100644
index 0000000..a837809
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundStatable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Ensembleable;
+import org.apache.curator.framework.api.GetConfigBuilder;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperation<Void>
+{
+ private final CuratorFrameworkImpl client;
+
+ private Backgrounding backgrounding;
+ private Watching watching;
+ private Stat stat;
+
+ public GetConfigBuilderImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ backgrounding = new Backgrounding();
+ watching = new Watching();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ this.stat = stat;
+ return this;
+ }
+
+ @Override
+ public BackgroundStatable<Ensembleable<byte[]>> watched()
+ {
+ watching = new Watching(true);
+ return this;
+ }
+
+ @Override
+ public GetConfigBuilder usingWatcher(Watcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public GetConfigBuilder usingWatcher(final CuratorWatcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public Ensembleable<byte[]> inBackground()
+ {
+ backgrounding = new Backgrounding(true);
+ return this;
+ }
+
+ @Override
+ public Ensembleable<byte[]> inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public Ensembleable<byte[]> inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(callback, executor);
+ return this;
+ }
+
+ @Override
+ public Ensembleable<byte[]> inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ if ( backgrounding.inBackground() )
+ {
+ client.processBackgroundOperation(new OperationAndData<Void>(this, null, backgrounding.getCallback(), null, backgrounding.getContext()), null);
+ return null;
+ }
+ else
+ {
+ return configInForeground();
+ }
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception
+ {
+ final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background");
+ AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
+ {
+ trace.commit();
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null);
+ client.processBackgroundOperation(operationAndData, event);
+ }
+ };
+ if ( watching.isWatched() )
+ {
+ client.getZooKeeper().getConfig(true, callback, backgrounding.getContext());
+ }
+ else
+ {
+ client.getZooKeeper().getConfig(watching.getWatcher(), callback, backgrounding.getContext());
+ }
+ }
+
+ private byte[] configInForeground() throws Exception
+ {
+ TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground");
+ try
+ {
+ return RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<byte[]>()
+ {
+ @Override
+ public byte[] call() throws Exception
+ {
+ if ( watching.isWatched() )
+ {
+ return client.getZooKeeper().getConfig(true, stat);
+ }
+ return client.getZooKeeper().getConfig(watching.getWatcher(), stat);
+ }
+ }
+ );
+ }
+ finally
+ {
+ trace.commit();
+ }
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index e994b03..23da075 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -250,7 +250,7 @@
rc = KeeperException.Code.DATAINCONSISTENCY.intValue();
}
}
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_DATA, rc, path, null, ctx, stat, data, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_DATA, rc, path, null, ctx, stat, data, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
new file mode 100644
index 0000000..0efa481
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.api.*;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+public class ReconfigBuilderImpl implements
+ ReconfigBuilder,
+ ReconfigBuilderMain,
+ StatEnsembleable<byte[]>,
+ Configurable,
+ StatConfigEnsembleable,
+ BackgroundOperation<Void>
+{
+ private final CuratorFrameworkImpl client;
+
+ private Backgrounding backgrounding = new Backgrounding();
+ private Stat responseStat;
+ private long fromConfig = -1;
+ private List<String> adding;
+ private List<String> joining;
+ private List<String> leaving;
+
+ public ReconfigBuilderImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ if ( backgrounding.inBackground() )
+ {
+ client.processBackgroundOperation(new OperationAndData<>(this, null, backgrounding.getCallback(), null, backgrounding.getContext()), null);
+ return new byte[0];
+ }
+ else
+ {
+ return ensembleInForeground();
+ }
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ responseStat = stat;
+ return this;
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ fromConfig = config;
+ return this;
+ }
+
+ @Override
+ public JoinLeaveStatConfigEnsembleable adding(String... server)
+ {
+ return adding((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public JoinLeaveStatConfigEnsembleable adding(List<String> servers)
+ {
+ this.adding = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
+
+ return new JoinLeaveStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public LeaveStatConfigEnsembleable joining(String... server)
+ {
+ return joining((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public LeaveStatConfigEnsembleable joining(List<String> servers)
+ {
+ return new LeaveStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public StatConfigEnsembleable leaving(List<String> servers)
+ {
+ return ReconfigBuilderImpl.this.leaving(servers);
+ }
+
+ @Override
+ public StatConfigEnsembleable leaving(String... server)
+ {
+ return ReconfigBuilderImpl.this.leaving(server);
+ }
+ };
+ }
+
+ @Override
+ public JoinStatConfigEnsembleable leaving(String... server)
+ {
+ return leaving((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public JoinStatConfigEnsembleable leaving(List<String> servers)
+ {
+ return new JoinStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public StatConfigEnsembleable joining(List<String> servers)
+ {
+ return ReconfigBuilderImpl.this.joining(servers);
+ }
+
+ @Override
+ public StatConfigEnsembleable joining(String... server)
+ {
+ return ReconfigBuilderImpl.this.joining(server);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public ReconfigBuilderMain inBackground()
+ {
+ backgrounding = new Backgrounding(true);
+ return this;
+ }
+
+ @Override
+ public ReconfigBuilderMain inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public ReconfigBuilderMain inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public ReconfigBuilderMain inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public ReconfigBuilderMain inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(callback, executor);
+ return this;
+ }
+
+ @Override
+ public ReconfigBuilderMain inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public LeaveAddStatConfigEnsembleable joining(String... server)
+ {
+ return joining((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public LeaveAddStatConfigEnsembleable joining(List<String> servers)
+ {
+ joining = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
+
+ return new LeaveAddStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public LeaveStatConfigEnsembleable adding(String... server)
+ {
+ return adding((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public LeaveStatConfigEnsembleable adding(List<String> servers)
+ {
+ return new LeaveStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public StatConfigEnsembleable leaving(List<String> servers)
+ {
+ return ReconfigBuilderImpl.this.leaving(servers);
+ }
+
+ @Override
+ public StatConfigEnsembleable leaving(String... server)
+ {
+ return ReconfigBuilderImpl.this.leaving(server);
+ }
+ };
+ }
+
+ @Override
+ public AddStatConfigEnsembleable leaving(String... server)
+ {
+ return leaving((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public AddStatConfigEnsembleable leaving(List<String> servers)
+ {
+ return new AddStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public StatConfigEnsembleable adding(List<String> servers)
+ {
+ return ReconfigBuilderImpl.this.adding(servers);
+ }
+
+ @Override
+ public StatConfigEnsembleable adding(String... server)
+ {
+ return ReconfigBuilderImpl.this.adding(server);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public JoinAddStatConfigEnsembleable leaving(String... server)
+ {
+ return leaving((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public JoinAddStatConfigEnsembleable leaving(List<String> servers)
+ {
+ leaving = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.<String>of();
+
+ return new JoinAddStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public JoinStatConfigurable adding(String... server)
+ {
+ return adding((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public JoinStatConfigurable adding(List<String> servers)
+ {
+ return new JoinStatConfigurable()
+ {
+ @Override
+ public Configurable joining(List<String> servers)
+ {
+ return ReconfigBuilderImpl.this.joining(servers);
+ }
+
+ @Override
+ public Configurable joining(String... server)
+ {
+ return ReconfigBuilderImpl.this.joining(server);
+ }
+ };
+ }
+
+ @Override
+ public AddStatConfigEnsembleable joining(String... server)
+ {
+ return joining((server != null) ? Arrays.asList(server) : null);
+ }
+
+ @Override
+ public AddStatConfigEnsembleable joining(List<String> servers)
+ {
+ return new AddStatConfigEnsembleable()
+ {
+ @Override
+ public byte[] forEnsemble() throws Exception
+ {
+ return ReconfigBuilderImpl.this.forEnsemble();
+ }
+
+ @Override
+ public Ensembleable<byte[]> storingStatIn(Stat stat)
+ {
+ return ReconfigBuilderImpl.this.storingStatIn(stat);
+ }
+
+ @Override
+ public StatEnsembleable<byte[]> fromConfig(long config) throws Exception
+ {
+ return ReconfigBuilderImpl.this.fromConfig(config);
+ }
+
+ @Override
+ public StatConfigEnsembleable adding(List<String> servers)
+ {
+ return ReconfigBuilderImpl.this.adding(servers);
+ }
+
+ @Override
+ public StatConfigEnsembleable adding(String... server)
+ {
+ return ReconfigBuilderImpl.this.adding(server);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<Void> data) throws Exception
+ {
+ final TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Background");
+ AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat)
+ {
+ trace.commit();
+ if ( (responseStat != null) && (stat != null) )
+ {
+ DataTree.copyStat(stat, responseStat);
+ }
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.RECONFIG, rc, path, null, ctx, stat, bytes, null, null, null, null);
+ client.processBackgroundOperation(data, event);
+ }
+ };
+ client.getZooKeeper().reconfig(joining, leaving, adding, fromConfig, callback, backgrounding.getContext());
+ }
+
+ private byte[] ensembleInForeground() throws Exception
+ {
+ TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground");
+ byte[] responseData = RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<byte[]>()
+ {
+ @Override
+ public byte[] call() throws Exception
+ {
+ return client.getZooKeeper().reconfig(joining, leaving, adding, fromConfig, responseStat);
+ }
+ }
+ );
+ trace.commit();
+ return responseData;
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
index f7b2480..17e88f8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
@@ -138,7 +138,7 @@
public void processResult(int rc, String path, Object ctx, Stat stat)
{
trace.commit();
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_ACL, rc, path, null, ctx, stat, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_ACL, rc, path, null, ctx, stat, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
},
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
index 8e93cbf..3ea704c 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
@@ -20,7 +20,6 @@
import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
-import org.apache.curator.framework.api.ACLPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.CuratorEvent;
@@ -28,13 +27,11 @@
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.framework.api.SetDataBackgroundVersionable;
import org.apache.curator.framework.api.SetDataBuilder;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.data.Stat;
-
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@@ -53,12 +50,12 @@
compress = false;
}
- TransactionSetDataBuilder asTransactionSetDataBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+ <T> TransactionSetDataBuilder<T> asTransactionSetDataBuilder(final T context, final CuratorMultiTransactionRecord transaction)
{
- return new TransactionSetDataBuilder()
+ return new TransactionSetDataBuilder<T>()
{
@Override
- public CuratorTransactionBridge forPath(String path, byte[] data) throws Exception
+ public T forPath(String path, byte[] data) throws Exception
{
if ( compress )
{
@@ -67,26 +64,26 @@
String fixedPath = client.fixForNamespace(path);
transaction.add(Op.setData(fixedPath, data, version), OperationType.SET_DATA, path);
- return curatorTransaction;
+ return context;
}
@Override
- public CuratorTransactionBridge forPath(String path) throws Exception
+ public T forPath(String path) throws Exception
{
return forPath(path, client.getDefaultData());
}
@Override
- public PathAndBytesable<CuratorTransactionBridge> withVersion(int version)
+ public PathAndBytesable<T> withVersion(int version)
{
SetDataBuilderImpl.this.withVersion(version);
return this;
}
@Override
- public PathAndBytesable<CuratorTransactionBridge> compressed() {
+ public PathAndBytesable<T> compressed()
+ {
compress = true;
-
return this;
}
};
@@ -219,7 +216,7 @@
public void processResult(int rc, String path, Object ctx, Stat stat)
{
trace.commit();
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
},
@@ -246,7 +243,7 @@
Stat resultStat = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null);
+ client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
index 2d3e9c0..cab31ae 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
@@ -93,7 +93,7 @@
public void processResult(int rc, String path, Object ctx)
{
trace.commit();
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, path, ctx, null, null, null, null, null);
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, path, ctx, null, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/TransactionOpImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/TransactionOpImpl.java
new file mode 100644
index 0000000..381842b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/TransactionOpImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
+import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
+
+public class TransactionOpImpl implements TransactionOp
+{
+ private final CuratorFrameworkImpl client;
+
+ public TransactionOpImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public TransactionCreateBuilder<CuratorOp> create()
+ {
+ ExtractingCuratorOp op = new ExtractingCuratorOp();
+ return new CreateBuilderImpl(client).<CuratorOp>asTransactionCreateBuilder(op, op.getRecord());
+ }
+
+ @Override
+ public TransactionDeleteBuilder<CuratorOp> delete()
+ {
+ ExtractingCuratorOp op = new ExtractingCuratorOp();
+ return new DeleteBuilderImpl(client).<CuratorOp>asTransactionDeleteBuilder(op, op.getRecord());
+ }
+
+ @Override
+ public TransactionSetDataBuilder<CuratorOp> setData()
+ {
+ ExtractingCuratorOp op = new ExtractingCuratorOp();
+ return new SetDataBuilderImpl(client).<CuratorOp>asTransactionSetDataBuilder(op, op.getRecord());
+ }
+
+ @Override
+ public TransactionCheckBuilder<CuratorOp> check()
+ {
+ ExtractingCuratorOp op = new ExtractingCuratorOp();
+ return CuratorTransactionImpl.<CuratorOp>makeTransactionCheckBuilder(client, op, op.getRecord());
+ }
+
+}
diff --git a/curator-framework/src/site/confluence/index.confluence b/curator-framework/src/site/confluence/index.confluence
index efde81b..b79ece4 100644
--- a/curator-framework/src/site/confluence/index.confluence
+++ b/curator-framework/src/site/confluence/index.confluence
@@ -42,7 +42,12 @@
|getData()|Begins an operation to get a ZNode's data. Call additional methods (watch, background or get stat) and finalize the operation by calling forPath()|
|setData()|Begins an operation to set a ZNode's data. Call additional methods (version or background) and finalize the operation by calling forPath()|
|getChildren()|Begins an operation to get a ZNode's list of children ZNodes. Call additional methods (watch, background or get stat) and finalize the operation by calling forPath()|
-|inTransaction()|Begins an atomic ZooKeeper transaction. Combine create, setData, check, and/or delete operations and then commit() as a unit.|
+|transactionOp()|Used to allocate operations to be used with transaction().|
+|transaction()|Atomically submit a set of operations as a transaction.|
+|getACL()|Begins an operation to return a ZNode's ACL settings. Call additional methods and finalize the operation by calling forPath()|
+|setACL()|Begins an operation to set a ZNode's ACL settings. Call additional methods and finalize the operation by calling forPath()|
+|getConfig()|Begins an operation to return the last committed configuration. Call additional methods and finalize the operation by calling forEnsemble()|
+|reconfig()|Begins an operation to change the configuration. Call additional methods and finalize the operation by calling forEnsemble()|
h3. Notifications
Notifications for background operations and watches are published via the ClientListener interface. You register listeners with the
@@ -59,10 +64,16 @@
|CREATE|getResultCode() and getPath()|
|DELETE|getResultCode() and getPath()|
|EXISTS|getResultCode(), getPath() and getStat()|
-|GET_DATA|getResultCode(), getPath(), getStat() and getData()|
-|SET_DATA|getResultCode(), getPath() and getStat()|
+|GET\_DATA|getResultCode(), getPath(), getStat() and getData()|
+|SET\_DATA|getResultCode(), getPath() and getStat()|
|CHILDREN|getResultCode(), getPath(), getStat(), getChildren()|
+|SYNC|getResultCode(), getStat()|
+|GET\_ACL|getResultCode(), getACLList()|
+|SET\_ACL|getResultCode()|
+|TRANSACTION|getResultCode(), getOpResults()|
|WATCHED|getWatchedEvent()|
+|GET\_CONFIG|getResultCode(), getData()|
+|RECONFIG|getResultCode(), getData()|
h2. Namespaces
Because a ZooKeeper cluster is a shared environment, it's vital that a namespace convention is
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransaction.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
similarity index 78%
copy from curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransaction.java
copy to curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
index c18af99..d302119 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransaction.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
@@ -18,18 +18,16 @@
*/
package org.apache.curator.framework.imps;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CompressionProvider;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestCompressionInTransaction extends BaseClassForTests
+public class TestCompressionInTransactionNew extends BaseClassForTests
{
@Test
public void testSetData() throws Exception
@@ -43,11 +41,13 @@
client.start();
//Create uncompressed data in a transaction
- client.inTransaction().create().forPath(path, data).and().commit();
+ CuratorOp op = client.transactionOp().create().forPath(path, data);
+ client.transaction().forOperations(op);
Assert.assertEquals(data, client.getData().forPath(path));
//Create compressed data in transaction
- client.inTransaction().setData().compressed().forPath(path, data).and().commit();
+ op = client.transactionOp().setData().compressed().forPath(path, data);
+ client.transaction().forOperations(op);
Assert.assertEquals(data, client.getData().decompressed().forPath(path));
}
finally
@@ -71,16 +71,18 @@
client.start();
//Create the nodes
- client.inTransaction().create().compressed().forPath(path1).and().
- create().forPath(path2).and().commit();
+ CuratorOp op1 = client.transactionOp().create().compressed().forPath(path1);
+ CuratorOp op2 = client.transactionOp().create().forPath(path2);
+ client.transaction().forOperations(op1, op2);
//Check they exist
Assert.assertNotNull(client.checkExists().forPath(path1));
Assert.assertNotNull(client.checkExists().forPath(path2));
//Set the nodes, path1 compressed, path2 uncompressed.
- client.inTransaction().setData().compressed().forPath(path1, data1).and().
- setData().forPath(path2, data2).and().commit();
+ op1 = client.transactionOp().setData().compressed().forPath(path1, data1);
+ op2 = client.transactionOp().setData().forPath(path2, data2);
+ client.transaction().forOperations(op1, op2);
Assert.assertNotEquals(data1, client.getData().forPath(path1));
Assert.assertEquals(data1, client.getData().decompressed().forPath(path1));
@@ -107,8 +109,10 @@
{
client.start();
- client.inTransaction().create().compressed().forPath(path1, data1).and().
- create().compressed().forPath(path2, data2).and().commit();
+ CuratorOp op1 = client.transactionOp().create().compressed().forPath(path1, data1);
+ CuratorOp op2 = client.transactionOp().create().compressed().forPath(path2, data2);
+
+ client.transaction().forOperations(op1, op2);
Assert.assertNotEquals(data1, client.getData().forPath(path1));
Assert.assertEquals(data1, client.getData().decompressed().forPath(path1));
@@ -141,8 +145,9 @@
{
client.start();
- client.inTransaction().create().compressed().forPath(path1, data1).and().
- create().forPath(path2, data2).and().commit();
+ CuratorOp op1 = client.transactionOp().create().compressed().forPath(path1, data1);
+ CuratorOp op2 = client.transactionOp().create().forPath(path2, data2);
+ client.transaction().forOperations(op1, op2);
Assert.assertNotEquals(data1, client.getData().forPath(path1));
Assert.assertEquals(data1, client.getData().decompressed().forPath(path1));
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransaction.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
similarity index 97%
rename from curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransaction.java
rename to curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
index c18af99..ebf591b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransaction.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
@@ -29,7 +29,8 @@
import java.util.concurrent.atomic.AtomicInteger;
-public class TestCompressionInTransaction extends BaseClassForTests
+@SuppressWarnings("deprecation")
+public class TestCompressionInTransactionOld extends BaseClassForTests
{
@Test
public void testSetData() throws Exception
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
new file mode 100644
index 0000000..133e690
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.ensemble.EnsembleListener;
+import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.ensemble.EnsembleTracker;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestReconfiguration
+{
+ private TestingCluster cluster;
+ private DynamicEnsembleProvider dynamicEnsembleProvider;
+ private WaitOnDelegateListener waitOnDelegateListener;
+ private EnsembleTracker ensembleTracker;
+ private CuratorFramework client;
+
+ private String connectionString1to5;
+ private String connectionString2to5;
+ private String connectionString3to5;
+
+ @BeforeMethod
+ public void setup() throws Exception
+ {
+ cluster = new TestingCluster(5);
+ cluster.start();
+
+ connectionString1to5 = cluster.getConnectString();
+ connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5);
+ connectionString3to5 = getConnectionString(cluster, 3, 4, 5);
+
+ dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5);
+ client = CuratorFrameworkFactory.builder()
+ .ensembleProvider(dynamicEnsembleProvider)
+ .retryPolicy(new RetryOneTime(1))
+ .build();
+ client.start();
+ client.blockUntilConnected();
+
+ //Wrap around the dynamic ensemble provider, so that we can wait until it has received the event.
+ waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider);
+ ensembleTracker = new EnsembleTracker(client);
+ ensembleTracker.getListenable().addListener(waitOnDelegateListener);
+ ensembleTracker.start();
+ //Wait for the initial event.
+ waitOnDelegateListener.waitForEvent();
+ }
+
+ @AfterMethod
+ public void tearDown() throws IOException
+ {
+ CloseableUtils.closeQuietly(ensembleTracker);
+ CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(cluster);
+ }
+
+ @Test
+ public void testSyncIncremental() throws Exception
+ {
+ Stat stat = new Stat();
+ byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
+ Assert.assertNotNull(bytes);
+ QuorumVerifier qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+
+ //Remove Servers
+ bytes = client.reconfig().leaving("1").fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig().leaving("2").fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+
+ //Add Servers
+ bytes = client.reconfig().joining("server.2=" + server2).fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig().joining("server.1=" + server1).fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+ }
+
+ @Test
+ public void testAsyncIncremental() throws Exception
+ {
+ final AtomicReference<byte[]> bytes = new AtomicReference<>();
+ final BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ bytes.set(event.getData());
+ //We only need the latch on getConfig.
+ if ( event.getContext() != null )
+ {
+ ((CountDownLatch)event.getContext()).countDown();
+ }
+ }
+
+ };
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getConfig().inBackground(callback, latch).forEnsemble();
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(bytes.get());
+ QuorumVerifier qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+
+ //Remove Servers
+ client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig().inBackground(callback, latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ //Add Servers
+ client.reconfig().inBackground(callback, latch).joining("server.2=" + server2).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig().inBackground(callback, latch).joining("server.1=" + server1).fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ }
+
+ @Test
+ public void testSyncNonIncremental() throws Exception
+ {
+ Stat stat = new Stat();
+ byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
+ Assert.assertNotNull(bytes);
+ QuorumVerifier qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+ String server3 = getServerString(qv, cluster, 3L);
+ String server4 = getServerString(qv, cluster, 4L);
+ String server5 = getServerString(qv, cluster, 5L);
+
+ //Remove Servers
+ bytes = client.reconfig()
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig()
+ .adding("server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+
+ //Add Servers
+ bytes = client.reconfig()
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+
+ bytes = client.reconfig()
+ .adding("server.1=" + server1,
+ "server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ qv = getQuorumVerifier(bytes);
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+ }
+
+ @Test
+ public void testAsyncNonIncremental() throws Exception
+ {
+ final AtomicReference<byte[]> bytes = new AtomicReference<>();
+ final BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ bytes.set(event.getData());
+ ((CountDownLatch)event.getContext()).countDown();
+ }
+
+ };
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getConfig().inBackground(callback, latch).forEnsemble();
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(bytes.get());
+ QuorumVerifier qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ String server1 = getServerString(qv, cluster, 1L);
+ String server2 = getServerString(qv, cluster, 2L);
+ String server3 = getServerString(qv, cluster, 3L);
+ String server4 = getServerString(qv, cluster, 4L);
+ String server5 = getServerString(qv, cluster, 5L);
+
+ //Remove Servers
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 3);
+
+ //Add Servers
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 4);
+
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.1=" + server1,
+ "server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
+ waitOnDelegateListener.waitForEvent();
+ Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
+ qv = getQuorumVerifier(bytes.get());
+ Assert.assertEquals(qv.getAllMembers().size(), 5);
+ }
+
+ static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception
+ {
+ Properties properties = new Properties();
+ properties.load(new StringReader(new String(bytes)));
+ return new QuorumMaj(properties);
+ }
+
+ static InstanceSpec getInstance(TestingCluster cluster, int id)
+ {
+ for ( InstanceSpec spec : cluster.getInstances() )
+ {
+ if ( spec.getServerId() == id )
+ {
+ return spec;
+ }
+ }
+ throw new IllegalStateException("InstanceSpec with id:" + id + " not found");
+ }
+
+ static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception
+ {
+ String str = qv.getAllMembers().get(id).toString();
+ //check if connection string is already there.
+ if ( str.contains(";") )
+ {
+ return str;
+ }
+ else
+ {
+ return str + ";" + getInstance(cluster, (int)id).getConnectString();
+ }
+ }
+
+ static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
+ {
+ StringBuilder sb = new StringBuilder();
+ Map<Long, InstanceSpec> specs = new HashMap<>();
+ for ( InstanceSpec spec : cluster.getInstances() )
+ {
+ specs.put((long)spec.getServerId(), spec);
+ }
+ for ( long id : ids )
+ {
+ if ( sb.length() != 0 )
+ {
+ sb.append(",");
+ }
+ sb.append(specs.get(id).getConnectString());
+ }
+ return sb.toString();
+ }
+
+ //Simple EnsembleListener that can wait until the delegate handles the event.
+ private static class WaitOnDelegateListener implements EnsembleListener
+ {
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ private final EnsembleListener delegate;
+
+ private WaitOnDelegateListener(EnsembleListener delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void connectionStringUpdated(String connectionString)
+ {
+ delegate.connectionStringUpdated(connectionString);
+ latch.countDown();
+ }
+
+ public void waitForEvent() throws InterruptedException, TimeoutException
+ {
+ if ( latch.await(5, TimeUnit.SECONDS) )
+ {
+ latch = new CountDownLatch(1);
+ }
+ else
+ {
+ throw new TimeoutException("Failed to receive event in time.");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactionsNew.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactionsNew.java
new file mode 100644
index 0000000..eaf94f8
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactionsNew.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.framework.api.transaction.OperationType;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestTransactionsNew extends BaseClassForTests
+{
+ @Test
+ public void testCheckVersion() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath("/foo");
+ Stat stat = client.setData().forPath("/foo", "new".getBytes()); // up the version
+
+ CuratorOp statOp = client.transactionOp().check().withVersion(stat.getVersion() + 1).forPath("/foo");
+ CuratorOp createOp = client.transactionOp().create().forPath("/bar");
+ try
+ {
+ client.transaction().forOperations(statOp, createOp);
+ Assert.fail();
+ }
+ catch ( KeeperException.BadVersionException correct )
+ {
+ // correct
+ }
+
+ Assert.assertNull(client.checkExists().forPath("/bar"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testWithNamespace() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace("galt").build();
+ try
+ {
+ client.start();
+ CuratorOp createOp1 = client.transactionOp().create().forPath("/foo", "one".getBytes());
+ CuratorOp createOp2 = client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-", "one".getBytes());
+ CuratorOp setDataOp = client.transactionOp().setData().forPath("/foo", "two".getBytes());
+ CuratorOp createOp3 = client.transactionOp().create().forPath("/foo/bar");
+ CuratorOp deleteOp = client.transactionOp().delete().forPath("/foo/bar");
+
+ Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp1, createOp2, setDataOp, createOp3, deleteOp);
+
+ Assert.assertTrue(client.checkExists().forPath("/foo") != null);
+ Assert.assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null);
+ Assert.assertEquals(client.getData().forPath("/foo"), "two".getBytes());
+ Assert.assertTrue(client.checkExists().forPath("/foo/bar") == null);
+
+ CuratorTransactionResult ephemeralResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-"));
+ Assert.assertNotNull(ephemeralResult);
+ Assert.assertNotEquals(ephemeralResult.getResultPath(), "/test-");
+ Assert.assertTrue(ephemeralResult.getResultPath().startsWith("/test-"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ CuratorOp createOp1 = client.transactionOp().create().forPath("/foo");
+ CuratorOp createOp2 = client.transactionOp().create().forPath("/foo/bar", "snafu".getBytes());
+
+ Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp1, createOp2);
+
+ Assert.assertTrue(client.checkExists().forPath("/foo/bar") != null);
+ Assert.assertEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes());
+
+ CuratorTransactionResult fooResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo"));
+ CuratorTransactionResult fooBarResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
+ Assert.assertNotNull(fooResult);
+ Assert.assertNotNull(fooBarResult);
+ Assert.assertNotSame(fooResult, fooBarResult);
+ Assert.assertEquals(fooResult.getResultPath(), "/foo");
+ Assert.assertEquals(fooBarResult.getResultPath(), "/foo/bar");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBackground() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ CuratorOp createOp1 = client.transactionOp().create().forPath("/foo");
+ CuratorOp createOp2 = client.transactionOp().create().forPath("/foo/bar", "snafu".getBytes());
+
+ final BlockingQueue<List<CuratorTransactionResult>> queue = Queues.newLinkedBlockingQueue();
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ queue.add(event.getOpResults());
+ }
+ };
+ client.transaction().inBackground(callback).forOperations(createOp1, createOp2);
+ Collection<CuratorTransactionResult> results = queue.poll(5, TimeUnit.SECONDS);
+
+ Assert.assertNotNull(results);
+ Assert.assertTrue(client.checkExists().forPath("/foo/bar") != null);
+ Assert.assertEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes());
+
+ CuratorTransactionResult fooResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo"));
+ CuratorTransactionResult fooBarResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
+ Assert.assertNotNull(fooResult);
+ Assert.assertNotNull(fooBarResult);
+ Assert.assertNotSame(fooResult, fooBarResult);
+ Assert.assertEquals(fooResult.getResultPath(), "/foo");
+ Assert.assertEquals(fooBarResult.getResultPath(), "/foo/bar");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBackgroundWithNamespace() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace("galt").build();
+ try
+ {
+ client.start();
+ CuratorOp createOp1 = client.transactionOp().create().forPath("/foo", "one".getBytes());
+ CuratorOp createOp2 = client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-", "one".getBytes());
+ CuratorOp setDataOp = client.transactionOp().setData().forPath("/foo", "two".getBytes());
+ CuratorOp createOp3 = client.transactionOp().create().forPath("/foo/bar");
+ CuratorOp deleteOp = client.transactionOp().delete().forPath("/foo/bar");
+
+ final BlockingQueue<List<CuratorTransactionResult>> queue = Queues.newLinkedBlockingQueue();
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ queue.add(event.getOpResults());
+ }
+ };
+ client.transaction().inBackground(callback).forOperations(createOp1, createOp2, setDataOp, createOp3, deleteOp);
+
+ Collection<CuratorTransactionResult> results = queue.poll(5, TimeUnit.SECONDS);
+
+ Assert.assertNotNull(results);
+ Assert.assertTrue(client.checkExists().forPath("/foo") != null);
+ Assert.assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null);
+ Assert.assertEquals(client.getData().forPath("/foo"), "two".getBytes());
+ Assert.assertTrue(client.checkExists().forPath("/foo/bar") == null);
+
+ CuratorTransactionResult ephemeralResult = Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-"));
+ Assert.assertNotNull(ephemeralResult);
+ Assert.assertNotEquals(ephemeralResult.getResultPath(), "/test-");
+ Assert.assertTrue(ephemeralResult.getResultPath().startsWith("/test-"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactions.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactionsOld.java
similarity index 93%
rename from curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactions.java
rename to curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactionsOld.java
index ae2cf1d..f0147d5 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactions.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTransactionsOld.java
@@ -25,6 +25,7 @@
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -32,15 +33,16 @@
import org.testng.annotations.Test;
import java.util.Collection;
-public class TestTransactions extends BaseClassForTests
+@SuppressWarnings("deprecation")
+public class TestTransactionsOld extends BaseClassForTests
{
@Test
public void testCheckVersion() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
try
{
+ client.start();
client.create().forPath("/foo");
Stat stat = client.setData().forPath("/foo", "new".getBytes()); // up the version
@@ -64,7 +66,7 @@
}
finally
{
- client.close();
+ CloseableUtils.closeQuietly(client);
}
}
@@ -72,9 +74,9 @@
public void testWithNamespace() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace("galt").build();
- client.start();
try
{
+ client.start();
Collection<CuratorTransactionResult> results =
client.inTransaction()
.create().forPath("/foo", "one".getBytes())
@@ -101,7 +103,7 @@
}
finally
{
- client.close();
+ CloseableUtils.closeQuietly(client);
}
}
@@ -109,9 +111,9 @@
public void testBasic() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
try
{
+ client.start();
Collection<CuratorTransactionResult> results =
client.inTransaction()
.create().forPath("/foo")
@@ -133,7 +135,7 @@
}
finally
{
- client.close();
+ CloseableUtils.closeQuietly(client);
}
}
}
diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index de45404..3394361 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-recipes</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Curator Recipes</name>
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 5f10c5e..ce5b23d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -59,13 +59,6 @@
super.startup(new ChaosMonkeyZookeeperServer(zks));
}
- /**
- * Build a connection with a Chaos Monkey ZookeeperServer
- */
- protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException
- {
- return new NIOServerCnxn(zkServer, sock, sk, this);
- }
public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer
{
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 96809d8..80eedb2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-test</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<name>Curator Testing</name>
<description>Unit testing utilities.</description>
diff --git a/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java b/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java
index b39a949..6d495df 100644
--- a/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java
+++ b/curator-test/src/main/java/org/apache/curator/test/InstanceSpec.java
@@ -70,6 +70,10 @@
private final int tickTime;
private final int maxClientCnxns;
+ public static void reset() {
+ nextServerId.set(1);
+ }
+
public static InstanceSpec newInstanceSpec()
{
return new InstanceSpec(null, -1, -1, -1, true, -1, -1, -1);
diff --git a/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java b/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java
index 8add08e..02979ee 100644
--- a/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java
+++ b/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java
@@ -99,7 +99,7 @@
{
for ( InstanceSpec thisSpec : instanceSpecs )
{
- properties.setProperty("server." + thisSpec.getServerId(), String.format("localhost:%d:%d", thisSpec.getQuorumPort(), thisSpec.getElectionPort()));
+ properties.setProperty("server." + thisSpec.getServerId(), String.format("localhost:%d:%d;localhost:%d", thisSpec.getQuorumPort(), thisSpec.getElectionPort(), thisSpec.getPort()));
}
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index cd86b72..f6bdbd8 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -249,6 +249,7 @@
private static Map<InstanceSpec, Collection<InstanceSpec>> makeSpecs(int instanceQty)
{
+ InstanceSpec.reset();
ImmutableList.Builder<InstanceSpec> builder = ImmutableList.builder();
for ( int i = 0; i < instanceQty; ++i )
{
diff --git a/curator-x-discovery-server/pom.xml b/curator-x-discovery-server/pom.xml
index 2728ce5..0bd9670 100644
--- a/curator-x-discovery-server/pom.xml
+++ b/curator-x-discovery-server/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-x-discovery-server</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Curator Service Discovery Server</name>
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 015fea7..6f380ae 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>curator-x-discovery</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Curator Service Discovery</name>
diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml
index 299ef9f..e6ed5e8 100644
--- a/curator-x-rpc/pom.xml
+++ b/curator-x-rpc/pom.xml
@@ -22,12 +22,12 @@
<parent>
<artifactId>apache-curator</artifactId>
<groupId>org.apache.curator</groupId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>curator-x-rpc</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<name>Curator RPC Proxy</name>
<description>A proxy that bridges non-java environments with the Curator framework and recipes</description>
diff --git a/pom.xml b/pom.xml
index cb747b7..8c984ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
- <version>2.8.1-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache Curator</name>
@@ -56,12 +56,12 @@
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <jdk-version>1.6</jdk-version>
+ <jdk-version>1.7</jdk-version>
<surefire-forkcount>1</surefire-forkcount>
<!-- versions -->
- <zookeeper-version>3.4.6</zookeeper-version>
+ <zookeeper-version>3.5.0-alpha</zookeeper-version>
<maven-project-info-reports-plugin-version>2.7</maven-project-info-reports-plugin-version>
<maven-bundle-plugin-version>2.3.7</maven-bundle-plugin-version>
<maven-javadoc-plugin-version>2.10.3</maven-javadoc-plugin-version>
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 16bbc13..f0d927d 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -25,3 +25,8 @@
Due to limitations in ZooKeeper's transport layer, a single queue will break if it has more than 10K\-ish items in it. This class
provides a facade over multiple distributed queues. It monitors the queues and if any one of them goes over a threshold, a new
queue is added. Puts are distributed amongst the queues.
+
+h2. EnsembleTracker
+
+Utility to listen for ensemble/configuration changes via registered EnsembleListeners. Allocate a EnsembleTracker, add one or more listeners
+and start it.