blob: 5b163c993f8748a828e0e0643d45828c13003e37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.conf.Configuration;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
public class TestJMXAdaptor extends TestCase{
MBeanServer mbs;
ChukwaAgent agent;
File baseDir, checkpointDir;
@Override
protected void setUp() throws Exception {
super.setUp();
mbs = JMXAgent.getMBeanServerInstance();
baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
checkpointDir = new File(baseDir, "addAdaptorTestCheckpoints");
createEmptyDir(checkpointDir);
Configuration conf = new Configuration();
conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
conf.setInt("chukwaAgent.control.port", 9093);
conf.setInt("chukwaAgent.http.port", 9090);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
agent = ChukwaAgent.getAgent(conf);
agent.start();
}
public void testJMXAdaptor() {
MXBeanImpl mxbean = null;
try{
mxbean = new MXBeanImpl();
ObjectName name = new ObjectName("chukwa:type=test");
mbs.registerMBean(mxbean, name);
} catch(Exception e){
e.printStackTrace();
fail("Failed to instantiate and register test mbean");
}
Map<Integer, String> m = new HashMap<Integer, String>() {
private static final long serialVersionUID = 1L;
{
put(1, "a");
put(2, "b");
put(3, "c");
}
};
Queue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Message1");
queue.add("Message2");
queue.add("Message3");
String[] sarray = new String[] {"Screw", "you", "guys", "I'm", "going", "home"};
mxbean.setQueue(queue);
mxbean.setInt(20);
mxbean.setMap(m);
mxbean.setString("TestString");
mxbean.setStringArray(sarray);
assertEquals(0, agent.adaptorCount());
System.out.println("adding jmx adaptor");
String id = agent.processAddCommand("add JMXAdaptor DebugProcessor localhost 10100 10 chukwa:* 0");
assertEquals(1, agent.adaptorCount());
//A thread that can block on ChunkQueue and can be interrupted
class Collector implements Runnable {
String fail = null;
public String getFailMessage(){
return fail;
}
public void run(){
try {
ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
List<Chunk> evts = new ArrayList<Chunk>();
eventQueue.collect(evts, 1);
// Expected - {"CompositeType":"3","String":"TestString","StringArray":6,"Map":"3","Int":20}
for (Chunk e : evts) {
String data = new String(e.getData());
JSONObject obj = (JSONObject) JSONValue.parse(data);
assertEquals(obj.get("CompositeType"), "3");
assertEquals(obj.get("String"), "TestString");
assertEquals(obj.get("StringArray"), "6");
assertEquals(obj.get("Map"), "3");
assertEquals(obj.get("Int").toString(), "20");
System.out.println("Verified all data collected by JMXAdaptor");
}
} catch (InterruptedException e1) {
e1.printStackTrace();
fail = "JMXAdaptor failed to collect all data; it was interrupted";
} catch (AssertionFailedError e2) {
e2.printStackTrace();
fail = "Assert failed while verifying JMX data- "+e2.getMessage();
} catch (Exception e3) {
e3.printStackTrace();
fail = "Exception in collector thread. Check the test output for stack trace";
}
}
}
try {
Collector worker = new Collector();
Thread t = new Thread(worker);
t.start();
t.join(20000);
if(t.isAlive()){
t.interrupt();
fail("JMXAdaptor failed to collect data after 20s. Check agent log and surefire report");
}
String failMessage = worker.getFailMessage();
if(failMessage != null){
fail(failMessage);
}
} catch(Exception e){
e.printStackTrace();
fail("Exception in TestJMXAdaptor");
}
System.out.println("shutting down jmx adaptor");
agent.stopAdaptor(id, true);
assertEquals(0, agent.adaptorCount());
}
//returns true if dir exists
public static boolean nukeDirContents(File dir) {
if(dir.exists()) {
if(dir.isDirectory()) {
for(File f: dir.listFiles()) {
nukeDirContents(f);
f.delete();
}
} else
dir.delete();
return true;
}
return false;
}
public static void createEmptyDir(File dir) {
if(!nukeDirContents(dir))
dir.mkdir();
assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
}
@Override
protected void tearDown() throws Exception {
nukeDirContents(checkpointDir);
checkpointDir.delete();
super.tearDown();
}
}