| <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> |
| <html> |
| <head> |
| <META http-equiv="Content-Type" content="text/html; charset=UTF-8"> |
| <meta content="Apache Forrest" name="Generator"> |
| <meta name="Forrest-version" content="0.8"> |
| <meta name="Forrest-skin-name" content="pelt"> |
| <title>Programming with ZooKeeper - A basic tutorial</title> |
| <link type="text/css" href="skin/basic.css" rel="stylesheet"> |
| <link media="screen" type="text/css" href="skin/screen.css" rel="stylesheet"> |
| <link media="print" type="text/css" href="skin/print.css" rel="stylesheet"> |
| <link type="text/css" href="skin/profile.css" rel="stylesheet"> |
| <script src="skin/getBlank.js" language="javascript" type="text/javascript"></script><script src="skin/getMenu.js" language="javascript" type="text/javascript"></script><script src="skin/fontsize.js" language="javascript" type="text/javascript"></script> |
| <link rel="shortcut icon" href="images/favicon.ico"> |
| </head> |
| <body onload="init()"> |
| <script type="text/javascript">ndeSetTextSize();</script> |
| <div id="top"> |
| <!--+ |
| |breadtrail |
| +--> |
| <div class="breadtrail"> |
| <a href="http://www.apache.org/">Apache</a> > <a href="http://hadoop.apache.org/">Hadoop</a> > <a href="http://hadoop.apache.org/zookeeper/">ZooKeeper</a><script src="skin/breadcrumbs.js" language="JavaScript" type="text/javascript"></script> |
| </div> |
| <!--+ |
| |header |
| +--> |
| <div class="header"> |
| <!--+ |
| |start group logo |
| +--> |
| <div class="grouplogo"> |
| <a href="http://hadoop.apache.org/"><img class="logoImage" alt="Hadoop" src="images/hadoop-logo.jpg" title="Apache Hadoop"></a> |
| </div> |
| <!--+ |
| |end group logo |
| +--> |
| <!--+ |
| |start Project Logo |
| +--> |
| <div class="projectlogo"> |
| <a href="http://hadoop.apache.org/zookeeper/"><img class="logoImage" alt="ZooKeeper" src="images/zookeeper_small.gif" title="ZooKeeper: distributed coordination"></a> |
| </div> |
| <!--+ |
| |end Project Logo |
| +--> |
| <!--+ |
| |start Search |
| +--> |
| <div class="searchbox"> |
| <form action="http://www.google.com/search" method="get" class="roundtopsmall"> |
| <input value="hadoop.apache.org" name="sitesearch" type="hidden"><input onFocus="getBlank (this, 'Search the site with google');" size="25" name="q" id="query" type="text" value="Search the site with google"> |
| <input name="Search" value="Search" type="submit"> |
| </form> |
| </div> |
| <!--+ |
| |end search |
| +--> |
| <!--+ |
| |start Tabs |
| +--> |
| <ul id="tabs"> |
| <li> |
| <a class="unselected" href="http://hadoop.apache.org/zookeeper/">Project</a> |
| </li> |
| <li> |
| <a class="unselected" href="http://wiki.apache.org/hadoop/ZooKeeper">Wiki</a> |
| </li> |
| <li class="current"> |
| <a class="selected" href="index.html">ZooKeeper 3.2 Documentation</a> |
| </li> |
| </ul> |
| <!--+ |
| |end Tabs |
| +--> |
| </div> |
| </div> |
| <div id="main"> |
| <div id="publishedStrip"> |
| <!--+ |
| |start Subtabs |
| +--> |
| <div id="level2tabs"></div> |
| <!--+ |
| |end Endtabs |
| +--> |
| <script type="text/javascript"><!-- |
| document.write("Last Published: " + document.lastModified); |
| // --></script> |
| </div> |
| <!--+ |
| |breadtrail |
| +--> |
| <div class="breadtrail"> |
| |
| |
| </div> |
| <!--+ |
| |start Menu, mainarea |
| +--> |
| <!--+ |
| |start Menu |
| +--> |
| <div id="menu"> |
| <div onclick="SwitchMenu('menu_1.1', 'skin/')" id="menu_1.1Title" class="menutitle">Overview</div> |
| <div id="menu_1.1" class="menuitemgroup"> |
| <div class="menuitem"> |
| <a href="index.html">Welcome</a> |
| </div> |
| <div class="menuitem"> |
| <a href="zookeeperOver.html">Overview</a> |
| </div> |
| <div class="menuitem"> |
| <a href="zookeeperStarted.html">Getting Started</a> |
| </div> |
| <div class="menuitem"> |
| <a href="releasenotes.html">Release Notes</a> |
| </div> |
| </div> |
| <div onclick="SwitchMenu('menu_selected_1.2', 'skin/')" id="menu_selected_1.2Title" class="menutitle" style="background-image: url('skin/images/chapter_open.gif');">Developer</div> |
| <div id="menu_selected_1.2" class="selectedmenuitemgroup" style="display: block;"> |
| <div class="menuitem"> |
| <a href="api/index.html">API Docs</a> |
| </div> |
| <div class="menuitem"> |
| <a href="zookeeperProgrammers.html">Programmer's Guide</a> |
| </div> |
| <div class="menuitem"> |
| <a href="javaExample.html">Java Example</a> |
| </div> |
| <div class="menupage"> |
| <div class="menupagetitle">Barrier and Queue Tutorial</div> |
| </div> |
| <div class="menuitem"> |
| <a href="recipes.html">Recipes</a> |
| </div> |
| </div> |
| <div onclick="SwitchMenu('menu_1.3', 'skin/')" id="menu_1.3Title" class="menutitle">BookKeeper</div> |
| <div id="menu_1.3" class="menuitemgroup"> |
| <div class="menuitem"> |
| <a href="bookkeeperStarted.html">Getting started</a> |
| </div> |
| <div class="menuitem"> |
| <a href="bookkeeperOverview.html">Overview</a> |
| </div> |
| <div class="menuitem"> |
| <a href="bookkeeperConfig.html">Setup guide</a> |
| </div> |
| <div class="menuitem"> |
| <a href="bookkeeperProgrammer.html">Programmer's guide</a> |
| </div> |
| </div> |
| <div onclick="SwitchMenu('menu_1.4', 'skin/')" id="menu_1.4Title" class="menutitle">Admin & Ops</div> |
| <div id="menu_1.4" class="menuitemgroup"> |
| <div class="menuitem"> |
| <a href="zookeeperAdmin.html">Administrator's Guide</a> |
| </div> |
| <div class="menuitem"> |
| <a href="zookeeperQuotas.html">Quota Guide</a> |
| </div> |
| <div class="menuitem"> |
| <a href="zookeeperJMX.html">JMX</a> |
| </div> |
| </div> |
| <div onclick="SwitchMenu('menu_1.5', 'skin/')" id="menu_1.5Title" class="menutitle">Contributor</div> |
| <div id="menu_1.5" class="menuitemgroup"> |
| <div class="menuitem"> |
| <a href="zookeeperInternals.html">ZooKeeper Internals</a> |
| </div> |
| </div> |
| <div onclick="SwitchMenu('menu_1.6', 'skin/')" id="menu_1.6Title" class="menutitle">Miscellaneous</div> |
| <div id="menu_1.6" class="menuitemgroup"> |
| <div class="menuitem"> |
| <a href="http://wiki.apache.org/hadoop/ZooKeeper">Wiki</a> |
| </div> |
| <div class="menuitem"> |
| <a href="http://wiki.apache.org/hadoop/ZooKeeper/FAQ">FAQ</a> |
| </div> |
| <div class="menuitem"> |
| <a href="http://hadoop.apache.org/zookeeper/mailing_lists.html">Mailing Lists</a> |
| </div> |
| </div> |
| <div id="credit"></div> |
| <div id="roundbottom"> |
| <img style="display: none" class="corner" height="15" width="15" alt="" src="skin/images/rc-b-l-15-1body-2menu-3menu.png"></div> |
| <!--+ |
| |alternative credits |
| +--> |
| <div id="credit2"></div> |
| </div> |
| <!--+ |
| |end Menu |
| +--> |
| <!--+ |
| |start content |
| +--> |
| <div id="content"> |
| <div title="Portable Document Format" class="pdflink"> |
| <a class="dida" href="zookeeperTutorial.pdf"><img alt="PDF -icon" src="skin/images/pdfdoc.gif" class="skin"><br> |
| PDF</a> |
| </div> |
| <h1>Programming with ZooKeeper - A basic tutorial</h1> |
| <div id="minitoc-area"> |
| <ul class="minitoc"> |
| <li> |
| <a href="#ch_Introduction">Introduction</a> |
| </li> |
| <li> |
| <a href="#sc_barriers">Barriers</a> |
| </li> |
| <li> |
| <a href="#sc_producerConsumerQueues">Producer-Consumer Queues</a> |
| </li> |
| <li> |
| <a href="#sc_sourceListing">Complete Source Listing</a> |
| </li> |
| </ul> |
| </div> |
| |
| |
| |
| |
| |
| <a name="N10009"></a><a name="ch_Introduction"></a> |
| <h2 class="h3">Introduction</h2> |
| <div class="section"> |
| <p>In this tutorial, we show simple implementations of barriers and |
| producer-consumer queues using ZooKeeper. We call the respective classes Barrier and Queue. |
| These examples assume that you have at least one ZooKeeper server running.</p> |
| <p>Both primitives use the following common excerpt of code:</p> |
| <pre class="code"> |
| static ZooKeeper zk = null; |
| static Integer mutex; |
| |
| String root; |
| |
| SyncPrimitive(String address) { |
| if(zk == null){ |
| try { |
| System.out.println("Starting ZK:"); |
| zk = new ZooKeeper(address, 3000, this); |
| mutex = new Integer(-1); |
| System.out.println("Finished starting ZK: " + zk); |
| } catch (IOException e) { |
| System.out.println(e.toString()); |
| zk = null; |
| } |
| } |
| } |
| |
| synchronized public void process(WatchedEvent event) { |
| synchronized (mutex) { |
| mutex.notify(); |
| } |
| } |
| </pre> |
| <p>Both classes extend SyncPrimitive. In this way, we execute steps that are |
| common to all primitives in the constructor of SyncPrimitive. To keep the examples |
| simple, we create a ZooKeeper object the first time we instantiate either a barrier |
| object or a queue object, and we declare a static variable that is a reference |
| to this object. The subsequent instances of Barrier and Queue check whether a |
| ZooKeeper object exists. Alternatively, we could have the application creating a |
| ZooKeeper object and passing it to the constructor of Barrier and Queue.</p> |
| <p> |
| We use the process() method to process notifications triggered due to watches. |
| In the following discussion, we present code that sets watches. A watch is internal |
| structure that enables ZooKeeper to notify a client of a change to a node. For example, |
| if a client is waiting for other clients to leave a barrier, then it can set a watch and |
| wait for modifications to a particular node, which can indicate that it is the end of the wait. |
| This point becomes clear once we go over the examples. |
| </p> |
| </div> |
| |
| |
| <a name="N1001F"></a><a name="sc_barriers"></a> |
| <h2 class="h3">Barriers</h2> |
| <div class="section"> |
| <p> |
| A barrier is a primitive that enables a group of processes to synchronize the |
| beginning and the end of a computation. The general idea of this implementation |
| is to have a barrier node that serves the purpose of being a parent for individual |
| process nodes. Suppose that we call the barrier node "/b1". Each process "p" then |
| creates a node "/b1/p". Once enough processes have created their corresponding |
| nodes, joined processes can start the computation. |
| </p> |
| <p>In this example, each process instantiates a Barrier object, and its constructor takes as parameters:</p> |
| <ul> |
| <li> |
| <p>the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")</p> |
| </li> |
| |
| <li> |
| <p>the path of the barrier node on ZooKeeper (e.g., "/b1")</p> |
| </li> |
| |
| <li> |
| <p>the size of the group of processes</p> |
| </li> |
| |
| </ul> |
| <p>The constructor of Barrier passes the address of the Zookeeper server to the |
| constructor of the parent class. The parent class creates a ZooKeeper instance if |
| one does not exist. The constructor of Barrier then creates a |
| barrier node on ZooKeeper, which is the parent node of all process nodes, and |
| we call root (<strong>Note:</strong> This is not the ZooKeeper root "/").</p> |
| <pre class="code"> |
| /** |
| * Barrier constructor |
| * |
| * @param address |
| * @param root |
| * @param size |
| */ |
| Barrier(String address, String root, int size) { |
| super(address); |
| this.root = root; |
| this.size = size; |
| |
| // Create barrier node |
| if (zk != null) { |
| try { |
| Stat s = zk.exists(root, false); |
| if (s == null) { |
| zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| } |
| } catch (KeeperException e) { |
| System.out |
| .println("Keeper exception when instantiating queue: " |
| + e.toString()); |
| } catch (InterruptedException e) { |
| System.out.println("Interrupted exception"); |
| } |
| } |
| |
| // My node name |
| try { |
| name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); |
| } catch (UnknownHostException e) { |
| System.out.println(e.toString()); |
| } |
| |
| } |
| </pre> |
| <p> |
| To enter the barrier, a process calls enter(). The process creates a node under |
| the root to represent it, using its host name to form the node name. It then wait |
| until enough processes have entered the barrier. A process does it by checking |
| the number of children the root node has with "getChildren()", and waiting for |
| notifications in the case it does not have enough. To receive a notification when |
| there is a change to the root node, a process has to set a watch, and does it |
| through the call to "getChildren()". In the code, we have that "getChildren()" |
| has two parameters. The first one states the node to read from, and the second is |
| a boolean flag that enables the process to set a watch. In the code the flag is true. |
| </p> |
| <pre class="code"> |
| /** |
| * Join barrier |
| * |
| * @return |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| |
| boolean enter() throws KeeperException, InterruptedException{ |
| zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.EPHEMERAL_SEQUENTIAL); |
| while (true) { |
| synchronized (mutex) { |
| List<String> list = zk.getChildren(root, true); |
| |
| if (list.size() < size) { |
| mutex.wait(); |
| } else { |
| return true; |
| } |
| } |
| } |
| } |
| </pre> |
| <p> |
| Note that enter() throws both KeeperException and InterruptedException, so it is |
| the reponsability of the application to catch and handle such exceptions.</p> |
| <p> |
| Once the computation is finished, a process calls leave() to leave the barrier. |
| First it deletes its corresponding node, and then it gets the children of the root |
| node. If there is at least one child, then it waits for a notification (obs: note |
| that the second parameter of the call to getChildren() is true, meaning that |
| ZooKeeper has to set a watch on the the root node). Upon reception of a notification, |
| it checks once more whether the root node has any child.</p> |
| <pre class="code"> |
| /** |
| * Wait until all reach barrier |
| * |
| * @return |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| |
| boolean leave() throws KeeperException, InterruptedException{ |
| zk.delete(root + "/" + name, 0); |
| while (true) { |
| synchronized (mutex) { |
| List<String> list = zk.getChildren(root, true); |
| if (list.size() > 0) { |
| mutex.wait(); |
| } else { |
| return true; |
| } |
| } |
| } |
| } |
| } |
| </pre> |
| </div> |
| |
| <a name="N10051"></a><a name="sc_producerConsumerQueues"></a> |
| <h2 class="h3">Producer-Consumer Queues</h2> |
| <div class="section"> |
| <p> |
| A producer-consumer queue is a distributed data estructure thata group of processes |
| use to generate and consume items. Producer processes create new elements and add |
| them to the queue. Consumer processes remove elements from the list, and process them. |
| In this implementation, the elements are simple integers. The queue is represented |
| by a root node, and to add an element to the queue, a producer process creates a new node, |
| a child of the root node. |
| </p> |
| <p> |
| The following excerpt of code corresponds to the constructor of the object. As |
| with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, |
| that creates a ZooKeeper object if one doesn't exist. It then verifies if the root |
| node of the queue exists, and creates if it doesn't. |
| </p> |
| <pre class="code"> |
| /** |
| * Constructor of producer-consumer queue |
| * |
| * @param address |
| * @param name |
| */ |
| Queue(String address, String name) { |
| super(address); |
| this.root = name; |
| // Create ZK node name |
| if (zk != null) { |
| try { |
| Stat s = zk.exists(root, false); |
| if (s == null) { |
| zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| } |
| } catch (KeeperException e) { |
| System.out |
| .println("Keeper exception when instantiating queue: " |
| + e.toString()); |
| } catch (InterruptedException e) { |
| System.out.println("Interrupted exception"); |
| } |
| } |
| } |
| </pre> |
| <p> |
| A producer process calls "produce()" to add an element to the queue, and passes |
| an integer as an argument. To add an element to the queue, the method creates a |
| new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to |
| append the value of the sequencer counter associated to the root node. In this way, |
| we impose a total order on the elements of the queue, thus guaranteeing that the |
| oldest element of the queue is the next one consumed. |
| </p> |
| <pre class="code"> |
| /** |
| * Add element to the queue. |
| * |
| * @param i |
| * @return |
| */ |
| |
| boolean produce(int i) throws KeeperException, InterruptedException{ |
| ByteBuffer b = ByteBuffer.allocate(4); |
| byte[] value; |
| |
| // Add child with value i |
| b.putInt(i); |
| value = b.array(); |
| zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT_SEQUENTIAL); |
| |
| return true; |
| } |
| </pre> |
| <p> |
| To consume an element, a consumer process obtains the children of the root node, |
| reads the node with smallest counter value, and returns the element. Note that |
| if there is a conflict, then one of the two contending processes won't be able to |
| delete the node and the delete operation will throw an exception.</p> |
| <p> |
| A call to getChildren() returns the list of children in lexicographic order. |
| As lexicographic order does not necessary follow the numerical order of the counter |
| values, we need to decide which element is the smallest. To decide which one has |
| the smallest counter value, we traverse the list, and remove the prefix "element" |
| from each one.</p> |
| <pre class="code"> |
| /** |
| * Remove first element from the queue. |
| * |
| * @return |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| int consume() throws KeeperException, InterruptedException{ |
| int retvalue = -1; |
| Stat stat = null; |
| |
| // Get the first element available |
| while (true) { |
| synchronized (mutex) { |
| List<String> list = zk.getChildren(root, true); |
| if (list.size() == 0) { |
| System.out.println("Going to wait"); |
| mutex.wait(); |
| } else { |
| Integer min = new Integer(list.get(0).substring(7)); |
| for(String s : list){ |
| Integer tempValue = new Integer(s.substring(7)); |
| //System.out.println("Temporary value: " + tempValue); |
| if(tempValue < min) min = tempValue; |
| } |
| System.out.println("Temporary value: " + root + "/element" + min); |
| byte[] b = zk.getData(root + "/element" + min, |
| false, stat); |
| zk.delete(root + "/element" + min, 0); |
| ByteBuffer buffer = ByteBuffer.wrap(b); |
| retvalue = buffer.getInt(); |
| |
| return retvalue; |
| } |
| } |
| } |
| } |
| } |
| </pre> |
| </div> |
| |
| <a name="N1006F"></a><a name="sc_sourceListing"></a> |
| <h2 class="h3">Complete Source Listing</h2> |
| <div class="section"> |
| <div class="note example"> |
| <div class="label">SyncPrimitive.Java</div> |
| <div class="content"> |
| |
| <title>SyncPrimitive.Java</title> |
| |
| <pre class="code"> |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.Random; |
| |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.ZooDefs.Ids; |
| import org.apache.zookeeper.data.Stat; |
| |
| public class SyncPrimitive implements Watcher { |
| |
| static ZooKeeper zk = null; |
| static Integer mutex; |
| |
| String root; |
| |
| SyncPrimitive(String address) { |
| if(zk == null){ |
| try { |
| System.out.println("Starting ZK:"); |
| zk = new ZooKeeper(address, 3000, this); |
| mutex = new Integer(-1); |
| System.out.println("Finished starting ZK: " + zk); |
| } catch (IOException e) { |
| System.out.println(e.toString()); |
| zk = null; |
| } |
| } |
| //else mutex = new Integer(-1); |
| } |
| |
| synchronized public void process(WatchedEvent event) { |
| synchronized (mutex) { |
| //System.out.println("Process: " + event.getType()); |
| mutex.notify(); |
| } |
| } |
| |
| /** |
| * Barrier |
| */ |
| static public class Barrier extends SyncPrimitive { |
| int size; |
| String name; |
| |
| /** |
| * Barrier constructor |
| * |
| * @param address |
| * @param root |
| * @param size |
| */ |
| Barrier(String address, String root, int size) { |
| super(address); |
| this.root = root; |
| this.size = size; |
| |
| // Create barrier node |
| if (zk != null) { |
| try { |
| Stat s = zk.exists(root, false); |
| if (s == null) { |
| zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| } |
| } catch (KeeperException e) { |
| System.out |
| .println("Keeper exception when instantiating queue: " |
| + e.toString()); |
| } catch (InterruptedException e) { |
| System.out.println("Interrupted exception"); |
| } |
| } |
| |
| // My node name |
| try { |
| name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); |
| } catch (UnknownHostException e) { |
| System.out.println(e.toString()); |
| } |
| |
| } |
| |
| /** |
| * Join barrier |
| * |
| * @return |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| |
| boolean enter() throws KeeperException, InterruptedException{ |
| zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.EPHEMERAL_SEQUENTIAL); |
| while (true) { |
| synchronized (mutex) { |
| List<String> list = zk.getChildren(root, true); |
| |
| if (list.size() < size) { |
| mutex.wait(); |
| } else { |
| return true; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Wait until all reach barrier |
| * |
| * @return |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| |
| boolean leave() throws KeeperException, InterruptedException{ |
| zk.delete(root + "/" + name, 0); |
| while (true) { |
| synchronized (mutex) { |
| List<String> list = zk.getChildren(root, true); |
| if (list.size() > 0) { |
| mutex.wait(); |
| } else { |
| return true; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Producer-Consumer queue |
| */ |
| static public class Queue extends SyncPrimitive { |
| |
| /** |
| * Constructor of producer-consumer queue |
| * |
| * @param address |
| * @param name |
| */ |
| Queue(String address, String name) { |
| super(address); |
| this.root = name; |
| // Create ZK node name |
| if (zk != null) { |
| try { |
| Stat s = zk.exists(root, false); |
| if (s == null) { |
| zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| } |
| } catch (KeeperException e) { |
| System.out |
| .println("Keeper exception when instantiating queue: " |
| + e.toString()); |
| } catch (InterruptedException e) { |
| System.out.println("Interrupted exception"); |
| } |
| } |
| } |
| |
| /** |
| * Add element to the queue. |
| * |
| * @param i |
| * @return |
| */ |
| |
| boolean produce(int i) throws KeeperException, InterruptedException{ |
| ByteBuffer b = ByteBuffer.allocate(4); |
| byte[] value; |
| |
| // Add child with value i |
| b.putInt(i); |
| value = b.array(); |
| zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT_SEQUENTIAL); |
| |
| return true; |
| } |
| |
| |
| /** |
| * Remove first element from the queue. |
| * |
| * @return |
| * @throws KeeperException |
| * @throws InterruptedException |
| */ |
| int consume() throws KeeperException, InterruptedException{ |
| int retvalue = -1; |
| Stat stat = null; |
| |
| // Get the first element available |
| while (true) { |
| synchronized (mutex) { |
| List<String> list = zk.getChildren(root, true); |
| if (list.size() == 0) { |
| System.out.println("Going to wait"); |
| mutex.wait(); |
| } else { |
| Integer min = new Integer(list.get(0).substring(7)); |
| for(String s : list){ |
| Integer tempValue = new Integer(s.substring(7)); |
| //System.out.println("Temporary value: " + tempValue); |
| if(tempValue < min) min = tempValue; |
| } |
| System.out.println("Temporary value: " + root + "/element" + min); |
| byte[] b = zk.getData(root + "/element" + min, |
| false, stat); |
| zk.delete(root + "/element" + min, 0); |
| ByteBuffer buffer = ByteBuffer.wrap(b); |
| retvalue = buffer.getInt(); |
| |
| return retvalue; |
| } |
| } |
| } |
| } |
| } |
| |
| public static void main(String args[]) { |
| if (args[0].equals("qTest")) |
| queueTest(args); |
| else |
| barrierTest(args); |
| |
| } |
| |
| public static void queueTest(String args[]) { |
| Queue q = new Queue(args[1], "/app1"); |
| |
| System.out.println("Input: " + args[1]); |
| int i; |
| Integer max = new Integer(args[2]); |
| |
| if (args[3].equals("p")) { |
| System.out.println("Producer"); |
| for (i = 0; i < max; i++) |
| try{ |
| q.produce(10 + i); |
| } catch (KeeperException e){ |
| |
| } catch (InterruptedException e){ |
| |
| } |
| } else { |
| System.out.println("Consumer"); |
| |
| for (i = 0; i < max; i++) { |
| try{ |
| int r = q.consume(); |
| System.out.println("Item: " + r); |
| } catch (KeeperException e){ |
| i--; |
| } catch (InterruptedException e){ |
| |
| } |
| } |
| } |
| } |
| |
| public static void barrierTest(String args[]) { |
| Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); |
| try{ |
| boolean flag = b.enter(); |
| System.out.println("Entered barrier: " + args[2]); |
| if(!flag) System.out.println("Error when entering the barrier"); |
| } catch (KeeperException e){ |
| |
| } catch (InterruptedException e){ |
| |
| } |
| |
| // Generate random integer |
| Random rand = new Random(); |
| int r = rand.nextInt(100); |
| // Loop for rand iterations |
| for (int i = 0; i < r; i++) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| |
| } |
| } |
| try{ |
| b.leave(); |
| } catch (KeeperException e){ |
| |
| } catch (InterruptedException e){ |
| |
| } |
| System.out.println("Left barrier"); |
| } |
| } |
| </pre> |
| </div> |
| </div> |
| </div> |
| |
| |
| <p align="right"> |
| <font size="-2"></font> |
| </p> |
| </div> |
| <!--+ |
| |end content |
| +--> |
| <div class="clearboth"> </div> |
| </div> |
| <div id="footer"> |
| <!--+ |
| |start bottomstrip |
| +--> |
| <div class="lastmodified"> |
| <script type="text/javascript"><!-- |
| document.write("Last Published: " + document.lastModified); |
| // --></script> |
| </div> |
| <div class="copyright"> |
| Copyright © |
| 2008 <a href="http://www.apache.org/licenses/">The Apache Software Foundation.</a> |
| </div> |
| <!--+ |
| |end bottomstrip |
| +--> |
| </div> |
| </body> |
| </html> |