blob: 23ae3ed2021e0c47c162621394da1203d1d12134 [file] [log] [blame]
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl;
import org.apache.hama.util.BSPNetUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.mortbay.log.Log;
public class TestZooKeeper extends TestCase {
private HamaConfiguration configuration;
public TestZooKeeper() {
configuration = new HamaConfiguration();
System.setProperty("user.dir", "/tmp");
configuration.set("bsp.master.address", "localhost");
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", "/tmp/hama-test");
configuration.set("bsp.output.dir", "/tmp/hama-test_out");
configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT,
BSPNetUtils.getFreePort(20000));
configuration.set("hama.sync.client.class",
org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
.getCanonicalName());
}
@Test
public void testClearZKNodes() throws IOException, KeeperException,
InterruptedException {
final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl();
boolean done = false;
try {
server.init(configuration);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Runnable() {
@Override
public void run() {
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}
});
executorService.awaitTermination(10, TimeUnit.SECONDS);
String bspRoot = "/bsp";
ZooKeeperSyncClientImpl peerClient = (ZooKeeperSyncClientImpl) SyncServiceFactory
.getPeerSyncClient(configuration);
ZKSyncBSPMasterClient masterClient = (ZKSyncBSPMasterClient) SyncServiceFactory
.getMasterSyncClient(configuration);
masterClient.init(configuration);
Thread.sleep(100);
Log.info("Created master and client sync clients");
assertTrue(masterClient.hasKey(bspRoot));
Log.info("BSP root exists");
BSPJobID jobID = new BSPJobID("test1", 1);
masterClient.registerJob(jobID.toString());
TaskID taskId1 = new TaskID(jobID, 1);
TaskID taskId2 = new TaskID(jobID, 2);
TaskAttemptID task1 = new TaskAttemptID(taskId1, 1);
TaskAttemptID task2 = new TaskAttemptID(taskId2, 1);
int zkPort = BSPNetUtils.getFreePort(21815);
configuration.setInt(Constants.PEER_PORT, zkPort);
peerClient.init(configuration, jobID, task1);
peerClient.registerTask(jobID, "hamanode1", 5000L, task1);
peerClient.registerTask(jobID, "hamanode2", 5000L, task2);
peerClient.storeInformation(
peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
true, null);
String[] names = peerClient.getAllPeerNames(task1);
Log.info("Found child count = " + names.length);
assertEquals(2, names.length);
Log.info("Passed the child count test");
masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"), true,
null);
masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"), true,
null);
String[] peerChild = masterClient.getChildKeySet(
masterClient.constructKey(jobID, "peer"), null);
Log.info("Found child count = " + peerChild.length);
assertEquals(2, peerChild.length);
Log.info(" Peer name " + peerChild[0]);
Log.info(" Peer name " + peerChild[1]);
Log.info("Passed the child key set test");
masterClient.deregisterJob(jobID.toString());
Log.info(masterClient.constructKey(jobID));
Thread.sleep(200);
assertEquals(false, masterClient.hasKey(masterClient.constructKey(jobID)));
Log.info("Passed the key presence test");
boolean result = masterClient
.getInformation(masterClient.constructKey(jobID, "info", "level3"),
new IntWritable());
assertEquals(false, result);
Writable[] writableArr = new Writable[2];
writableArr[0] = new LongWritable(3L);
writableArr[1] = new LongWritable(5L);
ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
arrWritable.set(writableArr);
masterClient.storeInformation(
masterClient.constructKey(jobID, "info", "level3"), arrWritable,
true, null);
ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
boolean getResult = masterClient.getInformation(
masterClient.constructKey(jobID, "info", "level3"), valueHolder);
assertTrue(getResult);
assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
Log.info("Passed array writable test");
done = true;
} catch (Exception e) {
e.printStackTrace();
} finally {
server.stopServer();
}
assertEquals(true, done);
}
}