CHUKWA-771.  Improved code quality issue identified by findbugs.  (Eric Yang)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8192ffd..e23338b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,8 @@
 
   BUGS
 
+    CHUKWA-771.  Improved code quality issue identified by findbugs.  (Eric Yang)
+
     CHUKWA-770. Moved default dashboard population code to login.jsp. (Eric Yang)
 
     CHUKWA-766. Updated license on source files.  (Eric Yang)
diff --git a/pom.xml b/pom.xml
index ad931c8..82a7f03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -465,9 +465,9 @@
                       <goals>
                         <goal>compile</goal>
                       </goals>
-                      <compilerVersion>1.6</compilerVersion>
-                      <source>1.6</source>
-                      <target>1.6</target>
+                      <compilerVersion>1.7</compilerVersion>
+                      <source>1.7</source>
+                      <target>1.7</target>
                       <excludes>
                         <exclude>**/ChukwaJobTrackerInstrumentation.java</exclude>
                       </excludes>
@@ -1360,7 +1360,7 @@
         </plugin>
         <plugin>
             <artifactId>maven-pmd-plugin</artifactId>
-            <version>2.6</version>
+            <version>3.4</version>
             <reportSets>
                 <reportSet>
                     <reports>
@@ -1370,13 +1370,13 @@
                 </reportSet>
             </reportSets>
             <configuration>
-                <targetJdk>1.6</targetJdk>
+                <targetJdk>1.7</targetJdk>
             </configuration>
         </plugin>
         <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>findbugs-maven-plugin</artifactId>
-            <version>2.3.3</version>
+            <version>3.0.1</version>
             <configuration>
                 <threshold>Normal</threshold>
                 <effort>Max</effort>
diff --git a/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java b/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java
index 1f184c7..ed90d4a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java
+++ b/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.charset.Charset;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -31,7 +32,7 @@
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 
 public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
-  public static int PROTOCOL_VERSION = 1;
+  public final static int PROTOCOL_VERSION = 1;
 
   protected DataFactory dataFactory = DataFactory.getInstance();
   private String source = "";
@@ -249,9 +250,15 @@
     return w;
   }
 
-  // FIXME: should do something better here, but this is OK for debugging
   public String toString() {
-    return source + ":" + streamName + ":" + new String(data) + "/" + seqID;
+    StringBuilder buffer = new StringBuilder();
+    buffer.append(source);
+    buffer.append(":");
+    buffer.append(streamName);
+    buffer.append(new String(data, Charset.forName("UTF-8")));
+    buffer.append("/");
+    buffer.append(seqID);
+    return buffer.toString();
   }
 
 
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
index 7cb10ec..ec37f7a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
@@ -45,7 +45,7 @@
 {
   private static Log log = LogFactory.getLog(FSMBuilder.class);
 	protected static final String SEP = "/";
-	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
+	protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
 	private final Pattern ipPattern =
     Pattern.compile(".*[a-zA-Z\\-_:\\/]([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z0-9\\-_:\\/].*");
 
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
index de28597..d3a1656 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
@@ -75,13 +75,13 @@
 		 * These are used for the add_info TreeMap; keys not listed here are automatically
 		 * prepended with "COUNTER_"
 		 */
-    protected static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
+    final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
 
-    protected static String JCDF_ID1 = "JCDF_ID1";
-    protected static String JCDF_ID2 = "JCDF_ID2";
-    protected static String JCDF_EDGE_TIME = "JCDF_E_TIME";
-    protected static String JCDF_EDGE_VOL = "JCDF_E_VOL";
-    protected static String JCDF_SEP = "@";
+    protected final static String JCDF_ID1 = "JCDF_ID1";
+    protected final static String JCDF_ID2 = "JCDF_ID2";
+    protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME";
+    protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL";
+    protected final static String JCDF_SEP = "@";
 
 
     /**
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
index fddfeb1..234f8bb 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
@@ -23,9 +23,9 @@
 	public static final int FILESYSTEM_FSM = 1;
 	public static final int MAPREDUCE_FSM_INCOMPLETE = 2;
 	public static final int FILESYSTEM_FSM_INCOMPLETE = 3;
-	public static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" };
+	static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" };
 	public FSMType() { this.val = 0; }
 	public FSMType(int newval) { this.val = newval; }
 	public int val;
-	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }		
-}
\ No newline at end of file
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }		
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
index a2fe353..96e1bd5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
@@ -25,9 +25,9 @@
 	public static final int WRITE_LOCAL = 3;
 	public static final int WRITE_REMOTE = 4;
 	public static final int WRITE_REPLICATED = 5;
-	public static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"};
+	static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"};
 	public HDFSState() { this.val = 1; }
 	public HDFSState(int newval) { this.val = newval; }
 	public int val;
-	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }		
-}
\ No newline at end of file
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }		
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
index 883225a..3de268e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
@@ -47,7 +47,7 @@
   private static Log log = LogFactory.getLog(FSMBuilder.class);
 	protected static final String SEP = "/";
 	
-	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
+	protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
 
 	/*
 	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
index ae06be5..f059049 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
@@ -27,10 +27,10 @@
 	public static final int REDUCE_REDUCER = 5;
 	public static final int SHUFFLE_LOCAL = 6;
 	public static final int SHUFFLE_REMOTE = 7;
-	public static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT", 
+	static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT", 
 	  "REDUCE_SORT", "REDUCE_REDUCER", "SHUFFLE_LOCAL", "SHUFFLE_REMOTE"};
 	public MapRedState() { this.val = 0; }
 	public MapRedState(int newval) { this.val = newval; }
 	public int val;
-	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]);	}		
-}
\ No newline at end of file
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]);	}		
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
index 56e8362..7298a5c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
@@ -23,9 +23,9 @@
 	public static final int STATE_START = 1;
 	public static final int STATE_END = 2;
 	public static final int STATE_INSTANT = 3;
-	public static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"};
+	static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"};
 	public StateType() { this.val = 0; }
 	public StateType(int newval) { this.val = newval; }
 	public int val;
-	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }
-}
\ No newline at end of file
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
index 785ba8b..188b076 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
@@ -46,7 +46,7 @@
 {
   private static Log log = LogFactory.getLog(FSMBuilder.class);
 	protected static final String SEP = "/";
-	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
+	protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
 	private final Pattern ipPattern =
     Pattern.compile("([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z\\-_:\\/].*");
   
@@ -149,8 +149,8 @@
     String [] k = key.getKey().split("/");    
 
     start_rec.time_orig_epoch = k[0];
-    start_rec.time_orig = (new Long(actual_time_ms)).toString(); // not actually used
-    start_rec.timestamp = (new Long(actual_time_ms)).toString();
+    start_rec.time_orig = (Long.valueOf(actual_time_ms)).toString(); // not actually used
+    start_rec.timestamp = (Long.valueOf(actual_time_ms)).toString();
     start_rec.time_end = new String("");
     start_rec.time_start = new String(start_rec.timestamp);
     
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
index 36b5e5e..87bf40c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
@@ -317,7 +317,7 @@
       return this.states.length;
     }
     public String [] getStates() {
-      return this.states;
+      return this.states.clone();
     }
   }
 
@@ -903,4 +903,4 @@
     return rs_tab;
   }
   
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java
index f92c9e9..1aa50af 100644
--- a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java
+++ b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java
@@ -20,12 +20,16 @@
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
 import java.text.ParsePosition;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
@@ -48,7 +52,7 @@
   public static String getContents(File aFile) {
     StringBuffer contents = new StringBuffer();
     try {
-      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
       try {
         String line = null; // not declared within while loop
         while ((line = input.readLine()) != null) {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
index ba68d73..1a0e2a3 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
@@ -26,6 +26,7 @@
 import org.json.simple.JSONObject;
 import org.json.simple.parser.ParseException;
 
+import java.nio.charset.Charset;
 import java.util.*;
 
 /**
@@ -87,7 +88,7 @@
           result.append(o.get("exitValue"));
           result.append(": ");
           result.append((String) o.get("stdout"));
-          data = result.toString().getBytes();
+          data = result.toString().getBytes(Charset.forName("UTF-8"));
         } else {
           String stdout = (String) o.get("stdout");
           data = stdout.getBytes();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
index c2792a7..313abc8 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
@@ -20,6 +20,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -72,7 +73,7 @@
       status.put("components", array);
       if(_shouldUseConnector){
         ChunkImpl chunk = new ChunkImpl(type, STREAM_NAME, seqId, status.toString()
-            .getBytes(), HeartbeatAdaptor.this);
+            .getBytes(Charset.forName("UTF-8")), HeartbeatAdaptor.this);
         dest.add(chunk);
       } else {
         sendDirectly(status.toString());
@@ -83,7 +84,7 @@
     private void sendDirectly(String data) {
       DataOutputStream dos = null;
       Socket sock = null;
-      byte[] bdata = data.getBytes();
+      byte[] bdata = data.getBytes(Charset.forName("UTF-8"));
       try {
         sock = new Socket(_host, _port);
         dos = new DataOutputStream(sock.getOutputStream());
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
index 792957c..c07f6fa 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
@@ -20,8 +20,11 @@
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
 import java.rmi.ConnectException;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -97,11 +100,11 @@
 				sb.append(File.separator);
 			}
 			sb.append("jmxremote.password");
-			String jmx_pw_file = sb.toString();
+			File jmx_pw_file = new File(sb.toString());
 			shutdown = false;
 			while(!shutdown){
 				try{					
-					BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file));
+					BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8")));
 					String[] creds = br.readLine().split(" ");
 					Map<String, String[]> env = new HashMap<String, String[]>();			
 					env.put(JMXConnector.CREDENTIALS, creds);
@@ -202,7 +205,7 @@
 					}
 				}
 				
-				byte[] data = json.toString().getBytes();		
+				byte[] data = json.toString().getBytes(Charset.forName("UTF-8"));		
 				sendOffset+=data.length;				
 				ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
 				long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
index b401f2e..39af580 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.util.Calendar;
 import java.util.TimeZone;
@@ -164,7 +165,7 @@
   }
 
   private int processMetrics() {
-    return addChunkToReceiver(getOozieMetrics().getBytes());
+    return addChunkToReceiver(getOozieMetrics().getBytes(Charset.forName("UTF-8")));
   }
 
   private String getOozieMetrics() {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
index 60ac50d..8e3fd8a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
@@ -19,9 +19,9 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
 import java.io.FileInputStream;
+import java.nio.charset.Charset;
 import java.security.KeyStore;
 import java.security.SecureRandom;
-
 import java.util.Calendar;
 import java.util.TimeZone;
 import java.util.Timer;
@@ -33,6 +33,7 @@
 import org.apache.log4j.Logger;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
+
 import static org.apache.hadoop.chukwa.datacollection.agent.ChukwaConstants.*;
 
 import com.sun.jersey.api.client.Client;
@@ -44,7 +45,6 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
-
 import javax.ws.rs.core.MediaType;
 
 public class RestAdaptor extends AbstractAdaptor {
@@ -75,7 +75,7 @@
         resource = c.resource(uri);
         bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
             String.class);
-        byte[] data = bean.getBytes();
+        byte[] data = bean.getBytes(Charset.forName("UTF-8"));
         sendOffset += data.length;
         ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
         long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
index 705d310..b37be9c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
@@ -22,6 +22,7 @@
 import java.io.InterruptedIOException;
 import java.io.ObjectInputStream;
 import java.net.*;
+import java.nio.charset.Charset;
 
 import org.apache.hadoop.chukwa.*;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -132,7 +133,7 @@
           while(running) {
             // read an event from the wire
             event = (LoggingEvent) ois.readObject();
-            byte[] bytes = layout.format(event).getBytes();
+            byte[] bytes = layout.format(event).getBytes(Charset.forName("UTF-8"));
             bytesReceived=bytes.length;
             Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this);
             dest.add(c);
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
index 258cdb5..07f6c66 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.net.*;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.HashMap;
 
@@ -63,7 +64,7 @@
     source.append(dp.getAddress());
     String dataType = type;
     byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
-    String rawPRI = new String(trimmedBuf, 1, 4);
+    String rawPRI = new String(trimmedBuf, 1, 4, Charset.forName("UTF-8"));
     int i = rawPRI.indexOf(">");
     if (i <= 3 && i > -1) {
       String priorityStr = rawPRI.substring(0,i);
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
index 4e5fcdf..5fea073 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
@@ -34,7 +34,7 @@
 
 
   public static int MAX_RETRIES = 300;
-  public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
+  static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
 
   private int attempts = 0;
   private long gracefulPeriodExpired = 0l;
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
index 1689e0e..9da09d5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
@@ -51,10 +51,10 @@
   public static final String MAX_READ_SIZE_OPT = 
       "chukwaAgent.fileTailingAdaptor.maxReadSize";
 
-  public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+  static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
   
   static Logger log;
-  protected static FileTailer tailer;
+  static FileTailer tailer;
   
   static {
     tailer = null;
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
index 2ac669e..b0ef917 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
@@ -22,6 +22,8 @@
 
 import javax.jms.Message;
 import javax.jms.JMSException;
+
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 
 /**
@@ -141,7 +143,7 @@
       return null;
     }
 
-    return sb.toString().getBytes();
+    return sb.toString().getBytes(Charset.forName("UTF-8"));
   }
 
   /**
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java
index d23675e..5aed762 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.sigar;
 
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.TimerTask;
 
@@ -203,13 +204,13 @@
         json.put("disk", fsList);
       }
       json.put("timestamp", System.currentTimeMillis());
-      byte[] data = json.toString().getBytes();
+      byte[] data = json.toString().getBytes(Charset.forName("UTF-8"));
       sendOffset += data.length;
       ChunkImpl c = new ChunkImpl("SystemMetrics", "Sigar", sendOffset, data, systemMetrics);
       if(!skip) {
         receiver.add(c);
       }
-    } catch (Exception se) {
+    } catch (InterruptedException se) {
       log.error(ExceptionUtil.getStackTrace(se));
     }
   }
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
index 1160fd3..dda7888 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
@@ -26,7 +26,9 @@
 import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.net.*;
+import java.nio.charset.Charset;
 import java.util.Map;
+
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
 import org.apache.log4j.Logger;
@@ -70,7 +72,7 @@
     public void run() {
       try {
         InputStream in = connection.getInputStream();
-        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
         PrintStream out = new PrintStream(new BufferedOutputStream(connection
             .getOutputStream()));
         String cmd = null;
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
index 7dad2d7..4a2e996 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
@@ -30,6 +30,7 @@
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -71,59 +72,79 @@
   // boolean WRITE_CHECKPOINTS = true;
   static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
 
-  private static Logger log = Logger.getLogger(ChukwaAgent.class);  
-  private OffsetStatsManager adaptorStatsManager = null;
+  private final static Logger log = Logger.getLogger(ChukwaAgent.class);  
+  private OffsetStatsManager<Adaptor> adaptorStatsManager = null;
   private Timer statsCollector = null;
-  private static volatile Configuration conf = null;
-  private static volatile ChukwaAgent agent = null;
+  private static Configuration conf = null;
+  private volatile static ChukwaAgent agent = null;
   public Connector connector = null;
+  private boolean stopped = false;
   
-  protected ChukwaAgent() throws AlreadyRunningException {
-    this(new ChukwaConfiguration());
+  private ChukwaAgent() {
+    agent = new ChukwaAgent(new ChukwaConfiguration());
   }
 
-  public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
-    ChukwaAgent.agent = this;
+  private ChukwaAgent(Configuration conf) {
+    agent = this;
     ChukwaAgent.conf = conf;
-    
     // almost always just reading this; so use a ConcurrentHM.
     // since we wrapped the offset, it's not a structural mod.
     adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
     adaptorsByName = new HashMap<String, Adaptor>();
     checkpointNumber = 0;
+    stopped = false;
+  }
 
-    boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
+  public static ChukwaAgent getAgent() {
+    if(agent == null || agent.isStopped()) {
+        agent = new ChukwaAgent();
+    } 
+    return agent;
+  }
+
+  public static ChukwaAgent getAgent(Configuration conf) {
+    if(agent == null || agent.isStopped()) {
+      agent = new ChukwaAgent(conf);
+    }
+    return agent;
+  }
+
+  public void start() throws AlreadyRunningException {
+    boolean checkPointRestore = conf.getBoolean(
         "chukwaAgent.checkpoint.enabled", true);
-    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+    checkPointBaseName = conf.get("chukwaAgent.checkpoint.name",
         "chukwa_checkpoint_");
-    final int CHECKPOINT_INTERVAL_MS = conf.getInt(
+    final int checkPointIntervalMs = conf.getInt(
         "chukwaAgent.checkpoint.interval", 5000);
-    final int STATS_INTERVAL_MS = conf.getInt(
+    final int statsIntervalMs = conf.getInt(
         "chukwaAgent.stats.collection.interval", 10000);
-    final int STATS_DATA_TTL_MS = conf.getInt(
+    int statsDataTTLMs = conf.getInt(
         "chukwaAgent.stats.data.ttl", 1200000);
 
     if (conf.get("chukwaAgent.checkpoint.dir") != null)
       checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
     else
-      DO_CHECKPOINT_RESTORE = false;
+      checkPointRestore = false;
 
     if (checkpointDir != null && !checkpointDir.exists()) {
-      checkpointDir.mkdirs();
+      boolean result = checkpointDir.mkdirs();
+      if(!result) {
+        log.error("Failed to create check point directory.");
+      }
     }
-    tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+    String tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
     DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
 
-    log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+    log.info("Config - CHECKPOINT_BASE_NAME: [" + checkPointBaseName + "]");
     log.info("Config - checkpointDir: [" + checkpointDir + "]");
-    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + checkPointIntervalMs
         + "]");
-    log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
-    log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
+    log.info("Config - DO_CHECKPOINT_RESTORE: [" + checkPointRestore + "]");
+    log.info("Config - STATS_INTERVAL_MS: [" + statsIntervalMs + "]");
     log.info("Config - tags: [" + tags + "]");
 
-    if (DO_CHECKPOINT_RESTORE) {
-      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+    if (checkPointRestore) {
+      log.info("checkpoints are enabled, period is " + checkPointIntervalMs);
     }
 
     File initialAdaptors = null;
@@ -131,7 +152,7 @@
       initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
 
     try {
-      if (DO_CHECKPOINT_RESTORE) {
+      if (checkPointRestore) {
         restoreFromCheckpoint();
       }
     } catch (IOException e) {
@@ -152,44 +173,31 @@
       // another agent is running.
       controlSock.start(); // this sets us up as a daemon
       log.info("control socket started on port " + controlSock.portno);
-
-      // start the HTTP server with stats collection
-      try {
-        this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
-        this.statsCollector = new Timer("ChukwaAgent Stats Collector");
-
-        startHttpServer(conf);
-
-        statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
-                STATS_INTERVAL_MS, STATS_INTERVAL_MS);
-      } catch (Exception e) {
-        log.error("Couldn't start HTTP server", e);
-        throw new RuntimeException(e);
-      }
-
-      // shouldn't start checkpointing until we're finishing launching
-      // adaptors on boot
-      if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
-        checkpointer = new Timer();
-        checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
-      }
     } catch (IOException e) {
       log.info("failed to bind to socket; aborting agent launch", e);
       throw new AlreadyRunningException();
     }
 
-  }
+    // start the HTTP server with stats collection
+    try {
+      adaptorStatsManager = new OffsetStatsManager<Adaptor>(statsDataTTLMs);
+      statsCollector = new Timer("ChukwaAgent Stats Collector");
 
-  public static ChukwaAgent getAgent() {
-    if(agent == null) {
-      try {
-        agent = new ChukwaAgent();
-      } catch(AlreadyRunningException e) {
-        log.error("Chukwa Agent is already running", e);
-        agent = null;
-      }
-    } 
-    return agent;
+      startHttpServer(conf);
+
+      statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
+          statsIntervalMs, statsIntervalMs);
+    } catch (Exception e) {
+      log.error("Couldn't start HTTP server", e);
+      throw new RuntimeException(e);
+    }
+
+    // shouldn't start check pointing until we're finishing launching
+    // adaptors on boot
+    if (checkPointIntervalMs > 0 && checkpointDir != null) {
+      checkpointer = new Timer();
+      checkpointer.schedule(new CheckpointTask(), 0, checkPointIntervalMs);
+    }
   }
 
   // doesn't need an equals(), comparator, etc
@@ -219,17 +227,16 @@
     }
   }
 
-  private final Map<Adaptor, Offset> adaptorPositions;
+  private static Map<Adaptor, Offset> adaptorPositions;
 
   // basically only used by the control socket thread.
   //must be locked before access
-  private final Map<String, Adaptor> adaptorsByName;
+  private static Map<String, Adaptor> adaptorsByName;
 
   private File checkpointDir; // lock this object to indicate checkpoint in
   // progress
-  private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
+  private String checkPointBaseName; // base filename for checkpoint files
   // checkpoints
-  private static String tags = "";
 
   private Timer checkpointer;
   private volatile boolean needNewCheckpoint = false; // set to true if any
@@ -238,13 +245,13 @@
   private int checkpointNumber; // id number of next checkpoint.
   // should be protected by grabbing lock on checkpointDir
 
-  private final AgentControlSocketListener controlSock;
+  private AgentControlSocketListener controlSock;
 
   public int getControllerPort() {
     return controlSock.getPort();
   }
 
-  public OffsetStatsManager getAdaptorStatsManager() {
+  public OffsetStatsManager<Adaptor> getAdaptorStatsManager() {
     return adaptorStatsManager;
   }
 
@@ -261,12 +268,10 @@
         System.exit(0);
       }
 
-      conf = ChukwaUtil.readConfiguration();
-      agent = new ChukwaAgent(conf);
-      
+      Configuration conf = ChukwaUtil.readConfiguration();
+      agent = ChukwaAgent.getAgent(conf);
       if (agent.anotherAgentIsRunning()) {
-        System.out
-            .println("another agent is running (or port has been usurped). "
+        log.error("another agent is running (or port has been usurped). "
                 + "Bailing out now");
         throw new AlreadyRunningException();
       }
@@ -286,11 +291,12 @@
             "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
         agent.connector = (Connector) Class.forName(connectorType).newInstance();
       }
+      agent.start();
       agent.connector.start();
 
       log.info("local agent started on port " + agent.getControlSock().portno);
-      //System.out.close();
-      //System.err.close();
+      System.out.close();
+      System.err.close();
     } catch (AlreadyRunningException e) {
       log.error("agent started already on this machine with same portno;"
           + " bailing out");
@@ -304,7 +310,11 @@
   }
 
   private boolean anotherAgentIsRunning() {
-    return !controlSock.isBound();
+    boolean result = false;
+    if(controlSock!=null) {
+      result = !controlSock.isBound();
+    }
+    return result;
   }
 
   /**
@@ -475,30 +485,26 @@
     synchronized (checkpointDir) {
       String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
         public boolean accept(File dir, String name) {
-          return name.startsWith(CHECKPOINT_BASE_NAME);
+          return name.startsWith(checkPointBaseName);
         }
       });
 
       if (checkpointNames == null) {
         log.error("Unable to list files in checkpoint dir");
         return false;
-      }
-      if (checkpointNames.length == 0) {
+      } else if (checkpointNames.length == 0) {
         log.info("No checkpoints found in " + checkpointDir);
         return false;
-      }
-
-      if (checkpointNames.length > 2)
+      } else if (checkpointNames.length > 2) {
         log.warn("expected at most two checkpoint files in " + checkpointDir
             + "; saw " + checkpointNames.length);
-      else if (checkpointNames.length == 0)
-        return false;
+      }
 
       String lowestName = null;
       int lowestIndex = Integer.MAX_VALUE;
       for (String n : checkpointNames) {
         int index = Integer
-            .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
+            .parseInt(n.substring(checkPointBaseName.length()));
         if (index < lowestIndex) {
           lowestName = n;
           lowestIndex = index;
@@ -516,7 +522,7 @@
       IOException {
     log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
     BufferedReader br = new BufferedReader(new InputStreamReader(
-        new FileInputStream(checkpoint)));
+        new FileInputStream(checkpoint), Charset.forName("UTF-8")));
     String cmd = null;
     while ((cmd = br.readLine()) != null)
       processAddCommand(cmd);
@@ -534,20 +540,23 @@
       log.info("writing checkpoint " + checkpointNumber);
 
       FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
-          CHECKPOINT_BASE_NAME + checkpointNumber));
+          checkPointBaseName + checkpointNumber));
       PrintWriter out = new PrintWriter(new BufferedWriter(
-          new OutputStreamWriter(fos)));
+          new OutputStreamWriter(fos, Charset.forName("UTF-8"))));
 
       for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
         out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
       }
 
       out.close();
-      File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
+      File lastCheckpoint = new File(checkpointDir, checkPointBaseName
           + (checkpointNumber - 1));
       log.debug("hopefully removing old checkpoint file "
           + lastCheckpoint.getAbsolutePath());
-      lastCheckpoint.delete();
+      boolean result = lastCheckpoint.delete();
+      if(!result) {
+        log.warn("Unable to delete lastCheckpoint file: "+lastCheckpoint.getAbsolutePath());
+      }
       checkpointNumber++;
     }
   }
@@ -729,11 +738,27 @@
     adaptorsByName.clear();
     adaptorPositions.clear();
     adaptorStatsManager.clear();
+    agent.stop();
     if (exit)
       System.exit(0);
   }
 
   /**
+   * Set agent into stop state.
+   */
+  private void stop() {
+    stopped = true;
+  }
+
+  /**
+   * Check if agent is in stop state.
+   * @return true if agent is in stop state.
+   */
+  private boolean isStopped() {
+    return stopped;
+  }
+
+  /**
    * Returns the control socket for this agent.
    */
   private AgentControlSocketListener getControlSock() {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java
index cbabbc0..f549614 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java
@@ -41,33 +41,24 @@
   
   private static ChukwaRestServer instance = null;
   
-  public static void startInstance(Configuration conf) throws Exception{
+  public static synchronized void startInstance(Configuration conf) throws Exception{
     if(instance == null){
-      synchronized(ChukwaRestServer.class) {
-        if(instance == null){
-          instance = new ChukwaRestServer(conf);
-          instance.start();
-        }        
-      }
+      instance = new ChukwaRestServer(conf);
+      instance.start();
     }
   }
-  
-  public static void stopInstance() throws Exception {
+
+  public static synchronized void stopInstance() throws Exception {
     if(instance != null) {
-      synchronized(ChukwaRestServer.class) {
-        if(instance != null){
-          instance.stop();
-          instance = null;
-        }
-      }
+      instance.stop();
+      instance = null;
     }
-    
   }
-  
+
   private ChukwaRestServer(Configuration conf){
     this.conf = conf;
   }
-  
+
   private void start() throws Exception{
     int portNum = conf.getInt(AGENT_HTTP_PORT, 9090);
     String jaxRsAddlPackages = conf.get(AGENT_REST_CONTROLLER_PACKAGES);
@@ -127,7 +118,7 @@
   
     log.info("started Chukwa http agent interface on port " + portNum);
   }
-  
+
   private void stop() throws Exception{
     jettyServer.stop();
     log.info("Successfully stopped Chukwa http agent interface");
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java
index 5d3f71a..b7c912b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java
@@ -23,13 +23,13 @@
 public class Examples {
   public static final AdaptorConfig CREATE_ADAPTOR_SAMPLE = new AdaptorConfig();
   public static final AdaptorInfo ADAPTOR_STATUS_SAMPLE = new AdaptorInfo();
-  public static final List<AdaptorAveragedRate> ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
+  final static List<AdaptorAveragedRate> ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
   public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_MINUTE = new AdaptorAveragedRate();
   public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_FIVE_MINUTE = new AdaptorAveragedRate();
   public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_TEN_MINUTE = new AdaptorAveragedRate();
   
   public static final AdaptorInfo SYS_ADAPTOR_STATUS_SAMPLE = new AdaptorInfo();
-  public static final List<AdaptorAveragedRate> SYS_ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
+  final static List<AdaptorAveragedRate> SYS_ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
   public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_MINUTE = new AdaptorAveragedRate();
   public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_FIVE_MINUTE = new AdaptorAveragedRate();
   public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_TEN_MINUTE = new AdaptorAveragedRate();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
deleted file mode 100644
index 2a17417..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.datacollection.collector;
-
-
-import org.mortbay.jetty.*;
-import org.mortbay.jetty.nio.SelectChannelConnector;
-import org.mortbay.jetty.servlet.*;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
-import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
-import org.apache.hadoop.chukwa.datacollection.writer.*;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import javax.servlet.http.HttpServlet;
-import java.io.File;
-import java.util.*;
-
-@Deprecated
-public class CollectorStub {
-
-  static int THREADS = 120;
-  public static Server jettyServer = null;
-
-  public static void main(String[] args) {
-
-    try {
-      if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) {
-        System.out.println("usage: Normally you should just invoke CollectorStub without arguments.");
-        System.out.println("A number of options can be specified here for debugging or special uses.  e.g.: ");
-        System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>"
-                    + "\n\tservlet=<classname>@path");
-        System.out.println("Command line options will override normal configuration.");
-        System.exit(0);
-      }
-
-      ChukwaConfiguration conf = new ChukwaConfiguration();
-      
-      try {
-        Configuration collectorConf = new Configuration(false);
-        collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml"));
-        collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml"));
-      } catch(Exception e) {e.printStackTrace();}
-      
-      int portNum = conf.getInt("chukwaCollector.http.port", 9999);
-      THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
-
-      // pick a writer.
-      ChukwaWriter w = null;
-      Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
-      ServletCollector servletCollector = new ServletCollector(conf);
-      for(String arg: args) {
-        if(arg.startsWith("writer=")) {       //custom writer class
-          String writerCmd = arg.substring("writer=".length());
-          if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) {
-            boolean verbose = !writerCmd.equals("pretend-quietly");
-            w = new ConsoleWriter(verbose);
-            w.init(conf);
-            servletCollector.setWriter(w);
-          } else 
-            conf.set("chukwaCollector.writerClass", writerCmd);
-        } else if(arg.startsWith("servlet=")) {     //adding custom servlet
-           String servletCmd = arg.substring("servlet=".length()); 
-           String[] halves = servletCmd.split("@");
-           try {
-             Class<?> servletClass = Class.forName(halves[0]);
-             HttpServlet srvlet = (HttpServlet) servletClass.newInstance();
-             if(!halves[1].startsWith("/"))
-               halves[1] = "/" + halves[1];
-             servletsToAdd.put(halves[1], srvlet);
-           } catch(Exception e) {
-             e.printStackTrace();
-           }
-        } else if(arg.startsWith("portno=")) {
-          portNum = Integer.parseInt(arg.substring("portno=".length()));
-        } else { //unknown arg
-          System.out.println("WARNING: unknown command line arg " + arg);
-          System.out.println("Invoke collector with command line arg 'help' for usage");
-        }
-      }
-
-      // Set up jetty connector
-      SelectChannelConnector jettyConnector = new SelectChannelConnector();
-      jettyConnector.setLowResourcesConnections(THREADS - 10);
-      jettyConnector.setLowResourceMaxIdleTime(1500);
-      jettyConnector.setPort(portNum);
-      
-      // Set up jetty server proper, using connector
-      jettyServer = new Server(portNum);
-      jettyServer.setConnectors(new Connector[] { jettyConnector });
-      org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
-      pool.setMaxThreads(THREADS);
-      jettyServer.setThreadPool(pool);
-      
-      // Add the collector servlet to server
-      Context root = new Context(jettyServer, "/", Context.SESSIONS);
-      root.addServlet(new ServletHolder(servletCollector), "/*");
-      
-      if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
-        root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
-
-      if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
-        root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
-
-      
-      root.setAllowNullPathInfo(false);
-
-      // Add in any user-specified servlets
-      for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) {
-        root.addServlet(new ServletHolder(e.getValue()), e.getKey());
-      }
-      
-      // And finally, fire up the server
-      jettyServer.start();
-      jettyServer.setStopAtShutdown(true);
-
-      System.out.println("started Chukwa http collector on port " + portNum);
-      System.out.close();
-      System.err.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-  }
-
-}
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
index c3e72d2..d4c2df4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
@@ -38,7 +38,7 @@
 
   private static final long serialVersionUID = -4627538252371890849L;
   
-  protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
+  protected final static Logger log = Logger.getLogger(CommitCheckServlet.class);
   CommitCheckThread commitCheck;
   Configuration conf;
     //interval at which to scan the filesystem, ms
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
index b1ead05..613fa3e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
@@ -67,7 +67,7 @@
   long totalStoredSize = 0;
 
   private static final long serialVersionUID = -4602082382919009285L;
-  protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
+  protected final static Logger log = Logger.getLogger(LogDisplayServlet.class);
   
   public LogDisplayServlet() {
     conf = new Configuration();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
index 03a88df..69e3566 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
@@ -27,6 +27,7 @@
 import java.io.PrintWriter;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -35,6 +36,7 @@
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.log4j.Logger;
 
@@ -128,14 +130,14 @@
         e.printStackTrace();
       }
       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
-          .getOutputStream()));
+          .getOutputStream(), Charset.forName("UTF-8")));
       if(id != null)
         bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
       else
         bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
       bw.flush();
       BufferedReader br = new BufferedReader(new InputStreamReader(s
-          .getInputStream()));
+          .getInputStream(), Charset.forName("UTF-8")));
       String resp = br.readLine();
       if (resp != null) {
         String[] fields = resp.split(" ");
@@ -153,15 +155,14 @@
         s.setSoTimeout(60000);
       } catch (SocketException e) {
         log.warn("Error while settin soTimeout to 60000");
-        e.printStackTrace();
       }
       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
-          .getOutputStream()));
+          .getOutputStream(), Charset.forName("UTF-8")));
       bw.println("SHUTDOWN " + id);
       bw.flush();
 
       BufferedReader br = new BufferedReader(new InputStreamReader(s
-          .getInputStream()));
+          .getInputStream(), Charset.forName("UTF-8")));
       String resp = br.readLine();
       if (resp == null || !resp.startsWith("OK")) {
         log.error("adaptor unregister error, id: " + id);
@@ -348,12 +349,12 @@
       e.printStackTrace();
     }
     PrintWriter bw = new PrintWriter(
-        new OutputStreamWriter(s.getOutputStream()));
+        new OutputStreamWriter(s.getOutputStream(), Charset.forName("UTF-8")));
 
     bw.println("LIST");
     bw.flush();
     BufferedReader br = new BufferedReader(new InputStreamReader(s
-        .getInputStream()));
+        .getInputStream(), Charset.forName("UTF-8")));
     String ln;
     Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
     while ((ln = br.readLine()) != null) {
@@ -370,11 +371,13 @@
                                                                         // -
                                                                         // paren
           long offset = Long.parseLong(parts[parts.length - 1]);
-          String tmpParams = parts[3];
+          StringBuilder tmpParams = new StringBuilder();
+          tmpParams.append(parts[3]);
           for (int i = 4; i < parts.length - 1; i++) {
-            tmpParams += " " + parts[i];
+            tmpParams.append(" ");
+            tmpParams.append(parts[i]);
           }
-          listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams,
+          listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams.toString(),
               offset));
         }
       }
@@ -563,8 +566,7 @@
     if (adaptorID != null) {
       log.info("Successfully added adaptor, id is:" + adaptorID);
     } else {
-      System.err.println("Agent reported failure to add adaptor, adaptor id returned was:"
-              + adaptorID);
+      log.error("Agent reported failure to add adaptor.");
     }
     return adaptorID;
   }
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
index 11e9305..6f818e4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
@@ -44,7 +44,7 @@
  */
 public class AsyncAckSender extends ChukwaHttpSender{
   
-  protected static Logger log = Logger.getLogger(AsyncAckSender.class);
+  protected final static Logger log = Logger.getLogger(AsyncAckSender.class);
   /*
    * Represents the state required for an asynchronous ack.
    * 
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
index 22460d7..1c8c3d2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
@@ -26,6 +26,7 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -354,7 +355,7 @@
           }
         });
 
-    pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
+    pars.setParameter(HttpMethodParams.SO_TIMEOUT, Integer.valueOf(COLLECTOR_TIMEOUT));
 
     method.setParams(pars);
     method.setPath(dest);
@@ -385,7 +386,7 @@
     // Get the response body
     byte[] resp_buf = method.getResponseBody();
     rstream = new ByteArrayInputStream(resp_buf);
-    BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
+    BufferedReader br = new BufferedReader(new InputStreamReader(rstream, Charset.forName("UTF-8")));
     String line;
     List<String> resp = new ArrayList<String>();
     while ((line = br.readLine()) != null) {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
index 30442e2..dea2d07 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
 
-import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Type;
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -36,6 +36,7 @@
 public class Reporter {
   private ArrayList<Put> meta = new ArrayList<Put>();
   private MessageDigest md5 = null;
+  private final static Charset UTF8 = Charset.forName("UTF-8");
 
   public Reporter() throws NoSuchAlgorithmException {
     md5 = MessageDigest.getInstance("md5");
@@ -48,11 +49,11 @@
     try {
       Type metaType = new TypeToken<Map<String, String>>(){}.getType();
       Map<String, String> meta = new HashMap<String, String>();
-      meta.put("sig", new String(value, "UTF-8"));
+      meta.put("sig", new String(value, UTF8));
       meta.put("type", "source");
       Gson gson = new Gson();
       buffer = gson.toJson(meta, metaType);
-      put(type.getBytes(), source.getBytes(), buffer.toString().getBytes());
+      put(type.getBytes(UTF8), source.getBytes(UTF8), buffer.toString().getBytes(UTF8));
     } catch (Exception e) {
       Log.warn("Error encoding metadata.");
       Log.warn(e);
@@ -70,7 +71,7 @@
       meta.put("type", "metric");
       Gson gson = new Gson();
       buffer = gson.toJson(meta, metaType);
-      put(type.getBytes(), metric.getBytes(), buffer.toString().getBytes());
+      put(type.getBytes(UTF8), metric.getBytes(UTF8), buffer.toString().getBytes(UTF8));
     } catch (Exception e) {
       Log.warn("Error encoding metadata.");
       Log.warn(e);
@@ -78,12 +79,12 @@
   }
 
   public void put(String key, String source, String info) {
-    put(key.getBytes(), source.getBytes(), info.getBytes());
+    put(key.getBytes(UTF8), source.getBytes(UTF8), info.getBytes(UTF8));
   }
 
   public void put(byte[] key, byte[] source, byte[] info) {
     Put put = new Put(key);
-    put.addColumn("k".getBytes(), source, info);
+    put.addColumn("k".getBytes(UTF8), source, info);
     meta.add(put);
   }
 
@@ -97,25 +98,20 @@
 
   private byte[] getHash(String key) {
     byte[] hash = new byte[5];
-    System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5);
+    System.arraycopy(md5.digest(key.getBytes(UTF8)), 0, hash, 0, 5);
     return hash;
   }
 
   public void putClusterName(String type, String clusterName) {
     byte[] value = getHash(clusterName);
     String buffer;
-    try {
-      Type metaType = new TypeToken<Map<String, String>>(){}.getType();
-      Map<String, String> meta = new HashMap<String, String>();
-      meta.put("sig", new String(value, "UTF-8"));
-      meta.put("type", "cluster");
-      Gson gson = new Gson();
-      buffer = gson.toJson(meta, metaType);
-      put(type.getBytes(), clusterName.getBytes(), buffer.toString().getBytes());
-    } catch (UnsupportedEncodingException e) {
-      Log.warn("Error encoding metadata.");
-      Log.warn(e);
-    }
+    Type metaType = new TypeToken<Map<String, String>>(){}.getType();
+    Map<String, String> meta = new HashMap<String, String>();
+    meta.put("sig", new String(value, UTF8));
+    meta.put("type", "cluster");
+    Gson gson = new Gson();
+    buffer = gson.toJson(meta, metaType);
+    put(type.getBytes(UTF8), clusterName.getBytes(UTF8), buffer.toString().getBytes(UTF8));
   }
 
 }
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
index f67fe87..bf64b24 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.chukwa.datacollection.writer.solr;
 
+import java.io.IOException;
+import java.nio.charset.Charset;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -34,12 +36,13 @@
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrServer;
 import org.apache.solr.common.SolrInputDocument;
 
 public class SolrWriter extends PipelineableWriter {
   private static Logger log = Logger.getLogger(SolrWriter.class);
-  private static CloudSolrServer server;
+  private CloudSolrServer server;
   private final static String ID = "id";
   private final static String SEQ_ID = "seqId";
   private final static String DATA_TYPE = "type";
@@ -64,8 +67,10 @@
       throw new WriterException("Solr server address is not defined.");
     }
     String collection = c.get("solr.collection", "logs");
-    server = new CloudSolrServer(serverName);
-    server.setDefaultCollection(collection);
+    if(server == null) {
+      server = new CloudSolrServer(serverName);
+      server.setDefaultCollection(collection);
+    }
   }
 
   @Override
@@ -84,10 +89,10 @@
         doc.addField(SOURCE, chunk.getSource());
         doc.addField(SEQ_ID, chunk.getSeqID());
         doc.addField(DATA_TYPE, chunk.getDataType());
-        doc.addField(DATA, new String(chunk.getData()));
+        doc.addField(DATA, new String(chunk.getData(), Charset.forName("UTF-8")));
         
         // TODO: improve parsing logic for more sophisticated tagging
-        String data = new String(chunk.getData());
+        String data = new String(chunk.getData(), Charset.forName("UTF-8"));
         Matcher m = userPattern.matcher(data);
         if(m.find()) {
           doc.addField(USER, m.group(1));
@@ -109,7 +114,7 @@
         }
         server.add(doc);
         server.commit();
-      } catch (Exception e) {
+      } catch (SolrServerException | IOException e) {
         log.warn("Failed to store data to Solr Cloud.");
         log.warn(ExceptionUtil.getStackTrace(e));
       }
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 8075f4d..f828ff1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -22,6 +22,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -71,19 +72,20 @@
   static double RESOLUTION = 360;
   static int MINUTE = 60000; //60 milliseconds
   final static int SECOND = (int) TimeUnit.SECONDS.toMillis(1);
+  private final static Charset UTF8 = Charset.forName("UTF-8");
 
-  static byte[] COLUMN_FAMILY = "t".getBytes();
-  static byte[] ANNOTATION_FAMILY = "a".getBytes();
-  static byte[] KEY_NAMES = "k".getBytes();
-  static byte[] CHART_TYPE = "chart_meta".getBytes();
-  static byte[] CHART_FAMILY = "c".getBytes();
-  static byte[] COMMON_FAMILY = "c".getBytes();
-  static byte[] WIDGET_TYPE = "widget_meta".getBytes();
-  static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes();
+  final static byte[] COLUMN_FAMILY = "t".getBytes(UTF8);
+  final static byte[] ANNOTATION_FAMILY = "a".getBytes(UTF8);
+  final static byte[] KEY_NAMES = "k".getBytes(UTF8);
+  final static byte[] CHART_TYPE = "chart_meta".getBytes(UTF8);
+  final static byte[] CHART_FAMILY = "c".getBytes(UTF8);
+  final static byte[] COMMON_FAMILY = "c".getBytes(UTF8);
+  final static byte[] WIDGET_TYPE = "widget_meta".getBytes(UTF8);
+  final static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes(UTF8);
   private static final String CHUKWA = "chukwa";
   private static final String CHUKWA_META = "chukwa_meta";
   private static long MILLISECONDS_IN_DAY = 86400000L;
-  protected static Connection connection = null;
+  private static Connection connection = null;
 
   public ChukwaHBaseStore() {
     super();
@@ -171,7 +173,7 @@
             byte[] key = CellUtil.cloneQualifier(kv);
             long timestamp = ByteBuffer.wrap(key).getLong();
             double value = Double
-                .parseDouble(new String(CellUtil.cloneValue(kv), "UTF-8"));
+                .parseDouble(new String(CellUtil.cloneValue(kv), UTF8));
             series.add(timestamp, value);
           }
         }
@@ -179,7 +181,7 @@
         currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
       }
       table.close();
-    } catch (Exception e) {
+    } catch (IOException e) {
       closeHBase();
       LOG.error(ExceptionUtil.getStackTrace(e));
     }
@@ -191,12 +193,12 @@
     try {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
-      Get get = new Get(metricGroup.getBytes());
+      Get get = new Get(metricGroup.getBytes(UTF8));
       Result result = table.get(get);
       for (Cell kv : result.rawCells()) {
-        JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), "UTF-8"));
+        JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), UTF8));
         if (json.get("type").equals("metric")) {
-          familyNames.add(new String(CellUtil.cloneQualifier(kv), "UTF-8"));
+          familyNames.add(new String(CellUtil.cloneQualifier(kv), UTF8));
         }
       }
       table.close();
@@ -219,7 +221,7 @@
       Iterator<Result> it = rs.iterator();
       while (it.hasNext()) {
         Result result = it.next();
-        metricGroups.add(new String(result.getRow(), "UTF-8"));
+        metricGroups.add(new String(result.getRow(), UTF8));
       }
       table.close();
     } catch (Exception e) {
@@ -241,9 +243,9 @@
       while (it.hasNext()) {
         Result result = it.next();
         for (Cell cell : result.rawCells()) {
-          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
           if (json!=null && json.get("type")!=null && json.get("type").equals("source")) {
-            pk.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+            pk.add(new String(CellUtil.cloneQualifier(cell), UTF8));
           }
         }
       }
@@ -296,7 +298,7 @@
         for(Cell cell : result.rawCells()) {
           byte[] dest = new byte[5];
           System.arraycopy(CellUtil.cloneRow(cell), 3, dest, 0, 5);
-          String source = new String(dest);
+          String source = new String(dest, UTF8);
           long time = cell.getTimestamp();
           // Time display in x axis
           long delta = time - startTime;
@@ -306,11 +308,11 @@
           if (keyMap.containsKey(source)) {
             y = keyMap.get(source);
           } else {
-            keyMap.put(source, new Integer(index));
+            keyMap.put(source, Integer.valueOf(index));
             y = index;
             index++;
           }
-          double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
+          double v = Double.parseDouble(new String(CellUtil.cloneValue(cell), UTF8));
           heatmap.put(x, y, v);
           if (v > max) {
             max = v;
@@ -355,9 +357,9 @@
       while (it.hasNext()) {
         Result result = it.next();
         for (Cell cell : result.rawCells()) {
-          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
           if (json.get("type").equals("cluster")) {
-            clusters.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+            clusters.add(new String(CellUtil.cloneQualifier(cell), UTF8));
           }
         }
       }
@@ -382,10 +384,10 @@
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get get = new Get(CHART_TYPE);
       Result r = table.get(get);
-      byte[] value = r.getValue(CHART_FAMILY, id.getBytes());
+      byte[] value = r.getValue(CHART_FAMILY, id.getBytes(UTF8));
       Gson gson = new Gson();
       if(value!=null) {
-        chart = gson.fromJson(new String(value), Chart.class);
+        chart = gson.fromJson(new String(value, UTF8), Chart.class);
       }
       table.close();
     } catch (Exception e) {
@@ -408,7 +410,7 @@
       Put put = new Put(CHART_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(chart);
-      put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+      put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
     } catch (Exception e) {
@@ -437,7 +439,7 @@
       s.setLineOptions(l);
       series.add(s);
     }
-    chart.SetSeries(series);
+    chart.setSeries(series);
     return createChart(chart);
     
   }
@@ -469,7 +471,7 @@
       Put put = new Put(CHART_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(chart);
-      put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+      put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
     } catch (Exception e) {
@@ -499,8 +501,8 @@
       }
       // Figure out the time range and determine the best resolution
       // to fetch the data
-      long range = Math.round((endTime - startTime)
-        / (MINUTES_IN_HOUR * MINUTE));
+      long range = (endTime - startTime)
+        / (long) (MINUTES_IN_HOUR * MINUTE);
       long sampleRate = 1;
       if (range <= 1) {
         sampleRate = 5;
@@ -512,7 +514,7 @@
         sampleRate = 87600;
       }
       double smoothing = (endTime - startTime)
-          / (sampleRate * SECOND ) / RESOLUTION;
+          / (double) (sampleRate * SECOND ) / (double) RESOLUTION;
 
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA));
@@ -550,7 +552,7 @@
               byte[] key = CellUtil.cloneQualifier(kv);
               long timestamp = ByteBuffer.wrap(key).getLong();
               double value = Double.parseDouble(new String(CellUtil.cloneValue(kv),
-                  "UTF-8"));
+                  UTF8));
               if(initial==0) {
                 filteredValue = value;
               }
@@ -558,7 +560,7 @@
               lastTime = timestamp;
               // Determine if there is any gap, if there is gap in data, reset
               // calculation.
-              if (elapsedTime > sampleRate) {
+              if (elapsedTime > (sampleRate * 5)) {
                 filteredValue = 0.0d;
               } else {
                 if (smoothing != 0.0d) {
@@ -587,7 +589,7 @@
         list.add(clone);
       }
       table.close();
-    } catch (Exception e) {
+    } catch (IOException|CloneNotSupportedException e) {
       closeHBase();
       LOG.error(ExceptionUtil.getStackTrace(e));
     }
@@ -622,7 +624,7 @@
             continue;
           }
           Gson gson = new Gson();
-          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
           list.add(widget);
           c++;
         }
@@ -658,7 +660,7 @@
         Result result = it.next();
         for(Cell kv : result.rawCells()) {
           Gson gson = new Gson();
-          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
           list.add(widget);
         }
       }
@@ -683,11 +685,11 @@
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get widget = new Get(WIDGET_TYPE);
-      widget.addColumn(COMMON_FAMILY, title.getBytes());
+      widget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
       Result rs = table.get(widget);
-      byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes());
+      byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes(UTF8));
       Gson gson = new Gson();
-      w = gson.fromJson(new String(buffer), Widget.class);
+      w = gson.fromJson(new String(buffer, UTF8), Widget.class);
       table.close();
     } catch (Exception e) {
       closeHBase();
@@ -708,7 +710,7 @@
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get widgetTest = new Get(WIDGET_TYPE);
-      widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes());
+      widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8));
       if (table.exists(widgetTest)) {
         LOG.warn("Widget: " + widget.getTitle() + " already exists.");
         created = false;
@@ -716,7 +718,7 @@
         Put put = new Put(WIDGET_TYPE);
         Gson gson = new Gson();
         String buffer = gson.toJson(widget);
-        put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes());
+        put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8), buffer.getBytes(UTF8));
         table.put(put);
         created = true;
       }
@@ -741,12 +743,12 @@
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Delete oldWidget = new Delete(WIDGET_TYPE);
-      oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+      oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
       table.delete(oldWidget);
       Put put = new Put(WIDGET_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(widget);
-      put.addColumn(COMMON_FAMILY, title.getBytes(), buffer.getBytes());
+      put.addColumn(COMMON_FAMILY, title.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
       result = true;
@@ -772,7 +774,7 @@
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Delete oldWidget = new Delete(WIDGET_TYPE);
-      oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+      oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
       table.delete(oldWidget);
       table.close();
       result = true;
@@ -790,7 +792,7 @@
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get dashboardTest = new Get(DASHBOARD_TYPE);
-      dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes());
+      dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes(UTF8));
       exists = table.exists(dashboardTest);
       table.close();
     } catch (Exception e) {
@@ -931,19 +933,19 @@
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get dashboard = new Get(DASHBOARD_TYPE);
-      dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+      dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
       Result rs = table.get(dashboard);
-      byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes());
+      byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));
       if(buffer == null) {
         // If user dashboard is not found, use default dashboard.
         key = new StringBuilder().append(id).append("|").toString();
         dashboard = new Get(DASHBOARD_TYPE);
-        dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+        dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
         rs = table.get(dashboard);
-        buffer = rs.getValue(COMMON_FAMILY, key.getBytes());        
+        buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));        
       }
       Gson gson = new Gson();
-      dash = gson.fromJson(new String(buffer), Dashboard.class);
+      dash = gson.fromJson(new String(buffer, UTF8), Dashboard.class);
       table.close();
     } catch (Exception e) {
       closeHBase();
@@ -964,7 +966,7 @@
       Put put = new Put(DASHBOARD_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(dash);
-      put.addColumn(COMMON_FAMILY, key.getBytes(), buffer.getBytes());
+      put.addColumn(COMMON_FAMILY, key.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
       result = true;
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
index eb79cd7..4f5f289 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.chukwa.extraction.hbase;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -37,7 +38,7 @@
   protected String sourceHelper;
 
   protected byte[] key = null;
-  byte[] CF = "t".getBytes();
+  byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
 
   boolean chunkInErrorSaved = false;
   ArrayList<Put> output = null;
@@ -70,14 +71,14 @@
     byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
     Put put = new Put(key);
     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-    put.add(CF, timeInBytes, time, value);
+    put.addColumn(CF, timeInBytes, time, value);
     output.add(put);
     reporter.putMetric(chunk.getDataType(), primaryKey);
     reporter.putSource(chunk.getDataType(), source);
   }
 
   public void addRecord(String primaryKey, String value) {
-    addRecord(primaryKey, value.getBytes());
+    addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
   }
 
   /**
@@ -96,7 +97,7 @@
     byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
     Put put = new Put(key);
     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-    put.add(CF, timeInBytes, time, value);
+    put.addColumn(CF, timeInBytes, time, value);
     output.add(put);
     reporter.putMetric(chunk.getDataType(), primaryKey);
   }
@@ -126,7 +127,7 @@
     Put put = new Put(key);
     String family = "a";
     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-    put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes());
+    put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
     output.add(put);
   }
 
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
index 2da64a3..483ac71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
@@ -17,34 +17,46 @@
  */
 package org.apache.hadoop.chukwa.extraction.hbase;
 
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.chukwa.util.HBaseUtil;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 
 public class DefaultProcessor extends AbstractProcessor {
-	
+
   public DefaultProcessor() throws NoSuchAlgorithmException {
-	super();
-	// TODO Auto-generated constructor stub
+    super();
+    // TODO Auto-generated constructor stub
   }
 
-static Logger LOG = Logger.getLogger(DefaultProcessor.class);
+  static Logger LOG = Logger.getLogger(DefaultProcessor.class);
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-	  byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource());
-	  Put put = new Put(key);
-	  byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-	  put.add("t".getBytes(), timeInBytes, chunk.getData());
-	  output.add(put);
-	  JSONObject json = new JSONObject();
-	  json.put("sig", key);
-	  json.put("type", "unknown");
-	  reporter.put(chunk.getDataType(), chunk.getSource(), json.toString());
+    byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(),
+        chunk.getSource());
+    Put put = new Put(key);
+    byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+    put.addColumn("t".getBytes(Charset.forName("UTF-8")), timeInBytes,
+        chunk.getData());
+    output.add(put);
+    Type defaultType = new TypeToken<Map<String, String>>() {
+    }.getType();
+    Gson gson = new Gson();
+    Map<String, String> meta = new HashMap<String, String>();
+    meta.put("sig", new String(key, Charset.forName("UTF-8")));
+    meta.put("type", "unknown");
+    String buffer = gson.toJson(meta, defaultType);
+    reporter.put(chunk.getDataType(), chunk.getSource(), buffer);
   }
 
 }
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
index 19df607..de64a0d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.extraction.hbase;
 
-
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
 
-import org.apache.hadoop.chukwa.util.HBaseUtil;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -37,15 +37,14 @@
   static final String recordNameField = "recordName";
   static final String hostName = "Hostname";
   static final String processName = "ProcessName";
-  static final byte[] cf = "t".getBytes();
+  static final byte[] cf = "t".getBytes(Charset.forName("UTF-8"));
 
   public HadoopMetricsProcessor() throws NoSuchAlgorithmException {
   }
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-    try {
-      String body = new String(recordEntry);
+      String body = new String(recordEntry, Charset.forName("UTF-8"));
       int start = body.indexOf('{');
       JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
 
@@ -56,10 +55,8 @@
       if(json.get(processName)!=null) {
         src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
       }
-      @SuppressWarnings("unchecked")
-      Iterator<String> ki = json.keySet().iterator();
-      while (ki.hasNext()) {
-        String keyName = ki.next();
+      for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+        String keyName = entry.getKey();
         if (timestampField.intern() == keyName.intern()) {
           continue;
         } else if (contextNameField.intern() == keyName.intern()) {
@@ -71,20 +68,14 @@
         } else if (processName.intern() == keyName.intern()) {
           continue;
         } else {
-          if (json.get(keyName) != null) {
-            String v = json.get(keyName).toString();
+          if(json.get(keyName)!=null) {
+            String v = entry.getValue().toString();
             String primaryKey = new StringBuilder(contextName).append(".")
                 .append(recordName).append(".").append(keyName).toString();
-            addRecord(time, primaryKey, src, v.getBytes(), output);
+            addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
           }
         }
       }
-
-    } catch (Exception e) {
-      LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
-          e);
-      throw e;
-    }
   }
 
 }
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
index dcbe2d4..0682c71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
@@ -22,43 +22,43 @@
 import java.util.Date;
 
 public class LogEntry {
-	private final static SimpleDateFormat sdf = new SimpleDateFormat(
-			"yyyy-MM-dd HH:mm");
+  private SimpleDateFormat sdf = new SimpleDateFormat(
+      "yyyy-MM-dd HH:mm");
 
-	private Date date;
-	private String logLevel;
-	private String className;
-	private String body;
+  private Date date;
+  private String logLevel;
+  private String className;
+  private String body;
 
-	public LogEntry(String recordEntry) throws ParseException {
-		String dStr = recordEntry.substring(0, 23);
-		date = sdf.parse(dStr);
-		int start = 24;
-		int idx = recordEntry.indexOf(' ', start);
-		logLevel = recordEntry.substring(start, idx);
-		start = idx + 1;
-		idx = recordEntry.indexOf(' ', start);
-		className = recordEntry.substring(start, idx - 1);
-		body = recordEntry.substring(idx + 1);
-	}
+  public LogEntry(String recordEntry) throws ParseException {
+    String dStr = recordEntry.substring(0, 23);
+    date = sdf.parse(dStr);
+    int start = 24;
+    int idx = recordEntry.indexOf(' ', start);
+    logLevel = recordEntry.substring(start, idx);
+    start = idx + 1;
+    idx = recordEntry.indexOf(' ', start);
+    className = recordEntry.substring(start, idx - 1);
+    body = recordEntry.substring(idx + 1);
+  }
 
-	public Date getDate() {
-		return date;
-	}
+  public Date getDate() {
+    return (Date) date.clone();
+  }
 
-	public void setDate(Date date) {
-		this.date = date;
-	}
+  public void setDate(Date date) {
+    this.date = (Date) date.clone();
+  }
 
-	public String getLogLevel() {
-		return logLevel;
-	}
+  public String getLogLevel() {
+    return logLevel;
+  }
 
-	public String getClassName() {
-		return className;
-	}
+  public String getClassName() {
+    return className;
+  }
 
-	public String getBody() {
-		return body;
-	}
+  public String getBody() {
+    return body;
+  }
 }
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
index c2695f2..3718fbd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -22,20 +22,14 @@
  */
 package org.apache.hadoop.chukwa.extraction.hbase;
 
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Iterator;
-import java.util.TimeZone;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
-import org.apache.hadoop.chukwa.Chunk;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -48,7 +42,7 @@
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-    String buffer = new String(recordEntry);
+    String buffer = new String(recordEntry, Charset.forName("UTF-8"));
     JSONObject json = (JSONObject) JSONValue.parse(buffer);
     time = ((Long) json.get("timestamp")).longValue();
     ChukwaRecord record = new ChukwaRecord();
@@ -70,11 +64,9 @@
       user = user + Double.parseDouble(cpu.get("user").toString());
       sys = sys + Double.parseDouble(cpu.get("sys").toString());
       idle = idle + Double.parseDouble(cpu.get("idle").toString());
-      @SuppressWarnings("unchecked")
-      Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator();
-      while(iterator.hasNext()) {
-        String key = iterator.next();
-        addRecord("cpu." + key + "." + i, cpu.get(key).toString());
+      for(Entry<String, Object> entry : (Set<Map.Entry>) cpu.entrySet()) {
+        String key = entry.getKey();
+        addRecord("cpu." + key + "." + i, String.valueOf(entry.getValue()));
       }
     }
     combined = combined / actualSize;
@@ -94,20 +86,15 @@
 
     record = new ChukwaRecord();
     JSONObject memory = (JSONObject) json.get("memory");
-    @SuppressWarnings("unchecked")
-    Iterator<String> memKeys = memory.keySet().iterator();
-    while (memKeys.hasNext()) {
-      String key = memKeys.next();
-      addRecord("memory." + key, memory.get(key).toString());
+    for(Entry<String, Object> entry : (Set<Map.Entry>) memory.entrySet()) {
+      String key = entry.getKey();
+      addRecord("memory." + key, String.valueOf(entry.getValue()));
     }
 
     record = new ChukwaRecord();
     JSONObject swap = (JSONObject) json.get("swap");
-    @SuppressWarnings("unchecked")
-    Iterator<String> swapKeys = swap.keySet().iterator();
-    while (swapKeys.hasNext()) {
-      String key = swapKeys.next();
-      addRecord("swap." + key, swap.get(key).toString());
+    for(Map.Entry<String, Object> entry : (Set<Map.Entry>) swap.entrySet()) {
+      addRecord("swap." + entry.getKey(), String.valueOf(entry.getValue()));
     }
 
     double rxBytes = 0;
@@ -122,28 +109,30 @@
     JSONArray netList = (JSONArray) json.get("network");
     for (int i = 0; i < netList.size(); i++) {
       JSONObject netIf = (JSONObject) netList.get(i);
-      @SuppressWarnings("unchecked")
-      Iterator<String> keys = netIf.keySet().iterator();
-      while (keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, netIf.get(key).toString());
+      for(Map.Entry<String, Object> entry : (Set<Map.Entry>) netIf.entrySet()) {
+        String key = entry.getKey();
+        long value = 0;
+        if(entry.getValue() instanceof Long) {
+          value = (Long) entry.getValue();
+        }
+        record.add(key + "." + i, String.valueOf(entry.getValue()));
         if (i != 0) {
           if (key.equals("RxBytes")) {
-            rxBytes = rxBytes + (Long) netIf.get(key);
+            rxBytes = rxBytes + value;
           } else if (key.equals("RxDropped")) {
-            rxDropped = rxDropped + (Long) netIf.get(key);
+            rxDropped = rxDropped + value;
           } else if (key.equals("RxErrors")) {
-            rxErrors = rxErrors + (Long) netIf.get(key);
+            rxErrors = rxErrors + value;
           } else if (key.equals("RxPackets")) {
-            rxPackets = rxPackets + (Long) netIf.get(key);
+            rxPackets = rxPackets + value;
           } else if (key.equals("TxBytes")) {
-            txBytes = txBytes + (Long) netIf.get(key);
+            txBytes = txBytes + value;
           } else if (key.equals("TxCollisions")) {
-            txCollisions = txCollisions + (Long) netIf.get(key);
+            txCollisions = txCollisions + value;
           } else if (key.equals("TxErrors")) {
-            txErrors = txErrors + (Long) netIf.get(key);
+            txErrors = txErrors + value;
           } else if (key.equals("TxPackets")) {
-            txPackets = txPackets + (Long) netIf.get(key);
+            txPackets = txPackets + value;
           }
         }
       }
@@ -168,22 +157,25 @@
     JSONArray diskList = (JSONArray) json.get("disk");
     for (int i = 0; i < diskList.size(); i++) {
       JSONObject disk = (JSONObject) diskList.get(i);
-      Iterator<String> keys = disk.keySet().iterator();
-      while (keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, disk.get(key).toString());
+      for(Entry<String, Object> entry : (Set<Map.Entry>) disk.entrySet()) {
+        String key = entry.getKey();
+        long value = 0;
+        if(entry.getValue() instanceof Long) {
+          value = (Long) entry.getValue();
+        }
+        record.add(key + "." + i, String.valueOf(entry.getValue()));
         if (key.equals("ReadBytes")) {
-          readBytes = readBytes + (Long) disk.get("ReadBytes");
+          readBytes = readBytes + value;
         } else if (key.equals("Reads")) {
-          reads = reads + (Long) disk.get("Reads");
+          reads = reads + Long.valueOf(value);;
         } else if (key.equals("WriteBytes")) {
-          writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+          writeBytes = writeBytes + value;
         } else if (key.equals("Writes")) {
-          writes = writes + (Long) disk.get("Writes");
+          writes = writes + value;
         } else if (key.equals("Total")) {
-          total = total + (Long) disk.get("Total");
+          total = total + value;
         } else if (key.equals("Used")) {
-          used = used + (Long) disk.get("Used");
+          used = used + value;
         }
       }
     }
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
index 9c21bf1..02fe3b7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
@@ -20,21 +20,20 @@
 
 
 import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
 
 import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore;
 
 public class ClusterConfig {
-  private static Set<String> clusterMap = null;
+  private Set<String> clusterMap = null;
 
   static public String getContents(File aFile) {
     // ...checks on aFile are elided
     StringBuffer contents = new StringBuffer();
 
     try {
-      // use buffering, reading one line at a time
-      // FileReader always assumes default encoding is OK!
-      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
       try {
         String line = null; // not declared within while loop
         /*
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
index 2d84c09..fe90941 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
@@ -24,6 +24,7 @@
 import java.io.InputStreamReader;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -124,12 +125,12 @@
         StringBuilder sb = new StringBuilder();
         String line = null;
         try {
-          BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+          BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
           while ((line = reader.readLine()) != null) {
             sb.append(line + "\n");
           }
           FSDataOutputStream out = fs.create(dest);
-          out.write(sb.toString().getBytes());
+          out.write(sb.toString().getBytes(Charset.forName("UTF-8")));
           out.close();
           reader.close();
         } catch(IOException e) {
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
index 7c8c6a7..19cde5f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
@@ -111,7 +111,7 @@
     return this.id;
   }
 
-  public void SetSeries(List<SeriesMetaData> series) {
+  public void setSeries(List<SeriesMetaData> series) {
     this.series = series;
   }
   
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
index 61dc0b5..74d5e89 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
@@ -89,11 +89,11 @@
   }
 
   public String[] getTokens() {
-    return tokens;
+    return tokens.clone();
   }
 
   public void setTokens(String[] tokens) {
-    this.tokens = tokens;
+    this.tokens = tokens.clone();
   }
 
   public void tokenize() {
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
index c9413bd..869efa4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
@@ -24,6 +24,7 @@
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLEncoder;
+import java.nio.charset.Charset;
 import java.util.Map;
 
 import javax.servlet.ServletException;
@@ -44,11 +45,11 @@
   private final String USER_AGENT = "Mozilla/5.0";
   private final static String SOLR_URL = "chukwa.solr.url";
   private final static Logger LOG = Logger.getLogger(HttpProxy.class);
-  private ChukwaConfiguration conf = new ChukwaConfiguration();
   private String solrUrl = null;
 
   public HttpProxy() {
     super();
+    ChukwaConfiguration conf = new ChukwaConfiguration();
     solrUrl = conf.get(SOLR_URL);
   }
 
@@ -72,7 +73,7 @@
     LOG.info("Response Code : " + responseCode);
 
     BufferedReader in = new BufferedReader(new InputStreamReader(
-        con.getInputStream()));
+        con.getInputStream(), Charset.forName("UTF-8")));
     String inputLine;
     StringBuffer response1 = new StringBuffer();
 
@@ -80,7 +81,7 @@
 
     while ((inputLine = in.readLine()) != null) {
       response1.append(inputLine);
-      sout.write(inputLine.getBytes());
+      sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
     }
     in.close();
 
@@ -131,7 +132,7 @@
     LOG.debug("Response Code : " + responseCode);
 
     BufferedReader in = new BufferedReader(new InputStreamReader(
-        con.getInputStream()));
+        con.getInputStream(), Charset.forName("UTF-8")));
     String inputLine;
     StringBuffer response1 = new StringBuffer();
 
@@ -139,7 +140,7 @@
 
     while ((inputLine = in.readLine()) != null) {
       response1.append(inputLine);
-      sout.write(inputLine.getBytes());
+      sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
     }
     in.close();
 
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
index ceed0df..1441253 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
@@ -20,6 +20,8 @@
 import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
@@ -63,8 +65,8 @@
     Gson gson = new Gson();
     Type stringStringMap = new TypeToken<Map<String, String>>(){}.getType();
     Map<String,String> map = gson.fromJson(buffer, stringStringMap);
-    for(String key : map.keySet()) {
-      request.getSession().setAttribute(key, map.get(key));
+    for(Entry<String, String> entry : (Set<Map.Entry<String, String>>) map.entrySet()) {
+      request.getSession().setAttribute(entry.getKey(), entry.getValue());
     }
     return Response.ok().build();
   }
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
index ea07797..4524922 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
@@ -39,7 +39,7 @@
 
   private VelocityEngine ve;
   private static Logger LOG = Logger.getLogger(VelocityResolver.class);
-  public static String LOGGER_NAME = VelocityResolver.class.getName();
+  public final static String LOGGER_NAME = VelocityResolver.class.getName();
   
   /**
    * Jersey configuration for setting up Velocity configuration.
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
deleted file mode 100644
index f0a3303..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.sql.SQLException;
-import java.util.Calendar;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Timer;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueInfoProcessor {
-
-  private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
-
-  private int intervalValue = 60;
-  private String torqueServer = null;
-  private String torqueBinDir = null;
-  private String domain = null;
-
-  private TreeMap<String, TreeMap<String, String>> currentHodJobs;
-
-  public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
-    this.intervalValue = interval;
-
-    torqueServer = System.getProperty("TORQUE_SERVER");
-    torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
-    domain = System.getProperty("DOMAIN");
-    currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
-  }
-
-  public void setup(boolean recover) throws Exception {
-  }
-
-  private void getHodJobInfo() throws IOException {
-    StringBuffer sb = new StringBuffer();
-    sb.append(torqueBinDir).append("/qstat -a");
-
-    String[] getQueueInfoCommand = new String[3];
-    getQueueInfoCommand[0] = "ssh";
-    getQueueInfoCommand[1] = torqueServer;
-    getQueueInfoCommand[2] = sb.toString();
-
-    String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
-        + " " + getQueueInfoCommand[2];
-    ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
-
-    Process p = pb.start();
-
-    Timer timeout = new Timer();
-    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
-    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
-    BufferedReader result = new BufferedReader(new InputStreamReader(p
-        .getInputStream()));
-    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
-        command, true);
-    errorHandler.start();
-
-    String line = null;
-    boolean start = false;
-    TreeSet<String> jobsInTorque = new TreeSet<String>();
-    while ((line = result.readLine()) != null) {
-      if (line.startsWith("---")) {
-        start = true;
-        continue;
-      }
-
-      if (start) {
-        String[] items = line.split("\\s+");
-        if (items.length >= 10) {
-          String hodIdLong = items[0];
-          String hodId = hodIdLong.split("[.]")[0];
-          String userId = items[1];
-          String numOfMachine = items[5];
-          String status = items[9];
-          jobsInTorque.add(hodId);
-          if (!currentHodJobs.containsKey(hodId)) {
-            TreeMap<String, String> aJobData = new TreeMap<String, String>();
-
-            aJobData.put("userId", userId);
-            aJobData.put("numOfMachine", numOfMachine);
-            aJobData.put("traceCheckCount", "0");
-            aJobData.put("process", "0");
-            aJobData.put("status", status);
-            currentHodJobs.put(hodId, aJobData);
-          } else {
-            TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-            aJobData.put("status", status);
-            currentHodJobs.put(hodId, aJobData);
-          }// if..else
-        }
-      }
-    }// while
-
-    try {
-      errorHandler.join();
-    } catch (InterruptedException ie) {
-      log.error(ie.getMessage());
-    }
-    timeout.cancel();
-
-    Set<String> currentHodJobIds = currentHodJobs.keySet();
-    Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
-    TreeSet<String> finishedHodIds = new TreeSet<String>();
-    while (currentHodJobIdsIt.hasNext()) {
-      String hodId = currentHodJobIdsIt.next();
-      if (!jobsInTorque.contains(hodId)) {
-        TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-        String process = aJobData.get("process");
-        if (process.equals("0") || process.equals("1")) {
-          aJobData.put("status", "C");
-        } else {
-          finishedHodIds.add(hodId);
-        }
-      }
-    }// while
-
-    Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
-    while (finishedHodIdsIt.hasNext()) {
-      String hodId = finishedHodIdsIt.next();
-      currentHodJobs.remove(hodId);
-    }
-
-  }
-
-  private boolean loadQstatData(String hodId) throws IOException, SQLException {
-    TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-    String userId = aJobData.get("userId");
-
-    StringBuffer sb = new StringBuffer();
-    sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
-    String[] qstatCommand = new String[3];
-    qstatCommand[0] = "ssh";
-    qstatCommand[1] = torqueServer;
-    qstatCommand[2] = sb.toString();
-
-    String command = qstatCommand[0] + " " + qstatCommand[1] + " "
-        + qstatCommand[2];
-    ProcessBuilder pb = new ProcessBuilder(qstatCommand);
-    Process p = pb.start();
-
-    Timer timeout = new Timer();
-    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
-    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
-    BufferedReader result = new BufferedReader(new InputStreamReader(p
-        .getInputStream()));
-    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
-        command, false);
-    errorHandler.start();
-    String line = null;
-    String hosts = null;
-    long startTimeValue = -1;
-    long endTimeValue = Calendar.getInstance().getTimeInMillis();
-    long executeTimeValue = Calendar.getInstance().getTimeInMillis();
-    boolean qstatfinished;
-
-    while ((line = result.readLine()) != null) {
-      if (line.indexOf("ctime") >= 0) {
-        String startTime = line.split("=")[1].trim();
-        // Tue Sep 9 23:44:29 2008
-        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-        Date startTimeDate;
-        try {
-          startTimeDate = sdf.parse(startTime);
-          startTimeValue = startTimeDate.getTime();
-        } catch (ParseException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-      }
-      if (line.indexOf("mtime") >= 0) {
-        String endTime = line.split("=")[1].trim();
-        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-        Date endTimeDate;
-        try {
-          endTimeDate = sdf.parse(endTime);
-          endTimeValue = endTimeDate.getTime();
-        } catch (ParseException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-      }
-      if (line.indexOf("etime") >= 0) {
-        String executeTime = line.split("=")[1].trim();
-        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-        Date executeTimeDate;
-        try {
-          executeTimeDate = sdf.parse(executeTime);
-          executeTimeValue = executeTimeDate.getTime();
-        } catch (ParseException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-      }
-      if (line.indexOf("exec_host") >= 0) {
-        hosts = line.split("=")[1].trim();
-      }
-    }
-
-    if (hosts != null && startTimeValue >= 0) {
-      String[] items2 = hosts.split("[+]");
-      int num = 0;
-      for (int i = 0; i < items2.length; i++) {
-        String machinetmp = items2[i];
-        if (machinetmp.length() > 3) {
-          String machine = items2[i].substring(0, items2[i].length() - 2);
-          StringBuffer data = new StringBuffer();
-          data.append("HodId=").append(hodId);
-          data.append(", Machine=").append(machine);
-          if (domain != null) {
-            data.append(".").append(domain);
-          }
-          log.info(data);
-          num++;
-        }
-      }
-      Timestamp startTimedb = new Timestamp(startTimeValue);
-      Timestamp endTimedb = new Timestamp(endTimeValue);
-      StringBuffer data = new StringBuffer();
-      long timeQueued = executeTimeValue - startTimeValue;
-      data.append("HodID=").append(hodId);
-      data.append(", UserId=").append(userId);
-      data.append(", StartTime=").append(startTimedb);
-      data.append(", TimeQueued=").append(timeQueued);
-      data.append(", NumOfMachines=").append(num);
-      data.append(", EndTime=").append(endTimedb);
-      log.info(data);
-      qstatfinished = true;
-
-    } else {
-
-      qstatfinished = false;
-    }
-
-    try {
-      errorHandler.join();
-    } catch (InterruptedException ie) {
-      log.error(ie.getMessage());
-    }
-    result.close();
-    timeout.cancel();
-
-    return qstatfinished;
-  }
-
-  private boolean loadTraceJobData(String hodId) throws IOException,
-      SQLException {
-    TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-    String userId = aJobData.get("userId");
-
-    StringBuffer sb = new StringBuffer();
-    sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
-    String[] traceJobCommand = new String[3];
-    traceJobCommand[0] = "ssh";
-    traceJobCommand[1] = torqueServer;
-    traceJobCommand[2] = sb.toString();
-
-    String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
-        + traceJobCommand[2];
-    ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
-
-    Process p = pb.start();
-
-    Timer timeout = new Timer();
-    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
-    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
-    BufferedReader result = new BufferedReader(new InputStreamReader(p
-        .getInputStream()));
-    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
-        command, false);
-    errorHandler.start();
-    String line = null;
-    String exit_status = null;
-    String hosts = null;
-    long timeQueued = -1;
-    long startTimeValue = -1;
-    long endTimeValue = -1;
-    boolean findResult = false;
-
-    while ((line = result.readLine()) != null && !findResult) {
-      if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
-          && line.indexOf("qtime") >= 0) {
-        TreeMap<String, String> jobData = new TreeMap<String, String>();
-        String[] items = line.split("\\s+");
-        for (int i = 0; i < items.length; i++) {
-          String[] items2 = items[i].split("=");
-          if (items2.length >= 2) {
-            jobData.put(items2[0], items2[1]);
-          }
-
-        }
-        String startTime = jobData.get("ctime");
-        startTimeValue = Long.valueOf(startTime);
-        startTimeValue = startTimeValue - startTimeValue % (60);
-        Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
-
-        String queueTime = jobData.get("qtime");
-        long queueTimeValue = Long.valueOf(queueTime);
-
-        String sTime = jobData.get("start");
-        long sTimeValue = Long.valueOf(sTime);
-
-        timeQueued = sTimeValue - queueTimeValue;
-
-        String endTime = jobData.get("end");
-        endTimeValue = Long.valueOf(endTime);
-        endTimeValue = endTimeValue - endTimeValue % (60);
-        Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
-
-        exit_status = jobData.get("Exit_status");
-        hosts = jobData.get("exec_host");
-        String[] items2 = hosts.split("[+]");
-        int num = 0;
-        for (int i = 0; i < items2.length; i++) {
-          String machinetemp = items2[i];
-          if (machinetemp.length() >= 3) {
-            String machine = items2[i].substring(0, items2[i].length() - 2);
-            StringBuffer data = new StringBuffer();
-            data.append("HodId=").append(hodId);
-            data.append(", Machine=").append(machine);
-            if (domain != null) {
-              data.append(".").append(domain);
-            }
-            log.info(data.toString());
-            num++;
-          }
-        }
-
-        StringBuffer data = new StringBuffer();
-        data.append("HodID=").append(hodId);
-        data.append(", UserId=").append(userId);
-        data.append(", Status=").append(exit_status);
-        data.append(", TimeQueued=").append(timeQueued);
-        data.append(", StartTime=").append(startTimedb);
-        data.append(", EndTime=").append(endTimedb);
-        data.append(", NumOfMachines=").append(num);
-        log.info(data.toString());
-        findResult = true;
-        log.debug(" hod info for job " + hodId + " has been loaded ");
-      }// if
-
-    }// while
-
-    try {
-      errorHandler.join();
-    } catch (InterruptedException ie) {
-      log.error(ie.getMessage());
-    }
-
-    timeout.cancel();
-    boolean tracedone = false;
-    if (!findResult) {
-
-      String traceCheckCount = aJobData.get("traceCheckCount");
-      int traceCheckCountValue = Integer.valueOf(traceCheckCount);
-      traceCheckCountValue = traceCheckCountValue + 1;
-      aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
-
-      log.debug("did not find tracejob info for job " + hodId + ", after "
-          + traceCheckCountValue + " times checking");
-      if (traceCheckCountValue >= 2) {
-        tracedone = true;
-      }
-    }
-    boolean finished = findResult | tracedone;
-    return finished;
-  }
-
-  private void process_data() throws SQLException {
-
-    long currentTime = System.currentTimeMillis();
-    currentTime = currentTime - currentTime % (60 * 1000);
-
-    Set<String> hodIds = currentHodJobs.keySet();
-
-    Iterator<String> hodIdsIt = hodIds.iterator();
-    while (hodIdsIt.hasNext()) {
-      String hodId = hodIdsIt.next();
-      TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-      String status = aJobData.get("status");
-      String process = aJobData.get("process");
-      if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
-        try {
-          boolean result = loadQstatData(hodId);
-          if (result) {
-            aJobData.put("process", "1");
-            currentHodJobs.put(hodId, aJobData);
-          }
-        } catch (IOException ioe) {
-          log.error("load qsat data Error:" + ioe.getMessage());
-
-        }
-      }
-      if (!process.equals("2") && status.equals("C")) {
-        try {
-          boolean result = loadTraceJobData(hodId);
-
-          if (result) {
-            aJobData.put("process", "2");
-            currentHodJobs.put(hodId, aJobData);
-          }
-        } catch (IOException ioe) {
-          log.error("loadTraceJobData Error:" + ioe.getMessage());
-        }
-      }// if
-
-    } // while
-
-  }
-
-  private void handle_jobData() throws SQLException {
-    try {
-      getHodJobInfo();
-    } catch (IOException ex) {
-      log.error("getQueueInfo Error:" + ex.getMessage());
-      return;
-    }
-    try {
-      process_data();
-    } catch (SQLException ex) {
-      log.error("process_data Error:" + ex.getMessage());
-      throw ex;
-    }
-  }
-
-  public void run_forever() throws SQLException {
-    while (true) {
-      handle_jobData();
-      try {
-        log.debug("sleeping ...");
-        Thread.sleep(this.intervalValue * 1000);
-      } catch (InterruptedException e) {
-        log.error(e.getMessage());
-      }
-    }
-  }
-
-  public void shutdown() {
-  }
-}
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
deleted file mode 100644
index 8ea645e..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.util.TimerTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueTimerTask extends TimerTask {
-  private Process ps = null;
-  private String command;
-
-  private static Log log = LogFactory.getLog(TorqueTimerTask.class);
-  // public static int timeoutInterval=300;
-  public static int timeoutInterval = 180;
-
-  public TorqueTimerTask() {
-    super();
-    // TODO Auto-generated constructor stub
-  }
-
-  public TorqueTimerTask(Process process, String command) {
-    super();
-    this.ps = process;
-    this.command = command;
-
-  }
-
-  public void run() {
-    ps.destroy();
-    log.error("torque command: " + command + " timed out");
-
-  }
-
-}
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
index 7ed3148..ff2a022 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
@@ -20,10 +20,11 @@
 
 
 import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
 
 public class ClusterConfig {
-  public static final HashMap<String, String> clusterMap = new HashMap<String, String>();
+  private HashMap<String, String> clusterMap = new HashMap<String, String>();
   private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator;
 
   static public String getContents(File aFile) {
@@ -31,9 +32,7 @@
     StringBuffer contents = new StringBuffer();
 
     try {
-      // use buffering, reading one line at a time
-      // FileReader always assumes default encoding is OK!
-      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
       try {
         String line = null; // not declared within while loop
         /*
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
index c655e24..70a80c0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.util;
 
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Calendar;
@@ -29,7 +30,6 @@
 public class HBaseUtil {
   private static Logger LOG = Logger.getLogger(HBaseUtil.class);
   
-  static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
   static MessageDigest md5 = null;
   static {
     try {
@@ -50,8 +50,9 @@
   }
 
   public static byte[] buildKey(long time, String primaryKey) {
+    Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
     c.setTimeInMillis(time);
-    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
     byte[] pk = getHash(primaryKey);
     byte[] key = new byte[12];
     System.arraycopy(day, 0, key, 0, day.length);
@@ -60,8 +61,9 @@
   }
   
   public static byte[] buildKey(long time, String primaryKey, String source) {
+    Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
     c.setTimeInMillis(time);
-    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
     byte[] pk = getHash(primaryKey);
     byte[] src = getHash(source);
     byte[] key = new byte[12];
@@ -73,7 +75,7 @@
   
   private static byte[] getHash(String key) {
     byte[] hash = new byte[5];
-    System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5);
+    System.arraycopy(md5.digest(key.getBytes(Charset.forName("UTF-8"))), 0, hash, 0, 5);
     return hash;
   }
 }
diff --git a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
index 26e4beb..c1cf37f 100644
--- a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
+++ b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
@@ -43,26 +43,6 @@
     }
   }
 
-  public void testWrongVersion() {
-    ChunkBuilder cb = new ChunkBuilder();
-    cb.addRecord("foo".getBytes());
-    cb.addRecord("bar".getBytes());
-    cb.addRecord("baz".getBytes());
-    Chunk c = cb.getChunk();
-    DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
-    try {
-      c.write(ob);
-      DataInputBuffer ib = new DataInputBuffer();
-      ib.reset(ob.getData(), c.getSerializedSizeEstimate());
-      // change current chunkImpl version
-      ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1;
-      ChunkImpl.read(ib);
-      fail("Should have raised an IOexception");
-    } catch (IOException e) {
-      // right behavior, do nothing
-    }
-  }
-  
   public void testTag() {
     ChunkBuilder cb = new ChunkBuilder();
     cb.addRecord("foo".getBytes());
diff --git a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
index 93500ff..99d8dc1 100644
--- a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
+++ b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
@@ -172,7 +172,8 @@
       conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
       conf.set("chukwaAgent.checkpoint.interval", "10000");
       int portno = conf.getInt("chukwaAgent.control.port", agentPort);
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent();
+      agent.start();
       conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
       conn.start();      
       sender = new ChukwaHttpSender(conf);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
index a039bc6..5b163c9 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
@@ -59,7 +59,8 @@
 	    conf.setInt("chukwaAgent.http.port", 9090);
 	    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);	    
 	    
-	    agent = new ChukwaAgent(conf);
+	    agent = ChukwaAgent.getAgent(conf);
+	    agent.start();
 	}
 	
 	public void testJMXAdaptor() {
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
index 367484e..39f151b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
@@ -57,7 +57,8 @@
     conf.setInt("chukwaAgent.http.port", 9090);
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
 
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
 
     assertEquals(0, agent.adaptorCount());
     System.out.println("adding jmx adaptor");
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
index 466053b..948ec5a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
@@ -63,7 +63,8 @@
   public void resendAfterStop(String adaptor)  throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     String ADAPTORID = "adaptor_test" + System.currentTimeMillis(); 
     String STR = "test data";
     int PORTNO = 9878;
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
index 717125b..5748c9b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
@@ -56,7 +56,8 @@
     conf.setInt("chukwaAgent.control.port", 0);
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
     
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     File emptyDir = new File(baseDir, "emptyDir2");
     createEmptyDir(emptyDir);
     
@@ -90,7 +91,8 @@
     anOldFile.deleteOnExit();
     aNewFile.deleteOnExit();
     anOldFile.setLastModified(10);//just after epoch
-    agent = new ChukwaAgent(conf); //restart agent.
+    agent = ChukwaAgent.getAgent(conf); //restart agent.
+    agent.start();
     
    Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
    assertTrue(aNewFile.exists());
@@ -135,7 +137,8 @@
     while(retry) {
       try {
         retry = false;
-        agent = new ChukwaAgent(conf);
+        agent = ChukwaAgent.getAgent(conf);
+        agent.start();
       } catch(Exception e) {
         retry = true;
       }
@@ -167,11 +170,12 @@
     anOldFile.deleteOnExit();
     aNewFile.deleteOnExit();
     anOldFile.setLastModified(10);//just after epoch
-    agent = new ChukwaAgent(conf); //restart agent.
-    
+    agent = ChukwaAgent.getAgent(conf); //restart agent.
+    agent.start();
+
    Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
    assertTrue(aNewFile.exists());
-   
+
     //make sure we started tailing the new, not the old, file.
     for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
       System.out.println(adaptors.getKey() +": " + adaptors.getValue());
@@ -182,7 +186,7 @@
     Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
     assertEquals(4, agent.adaptorCount());
     agent.shutdown();
-    
+
     nukeDirContents(checkpointDir);//nuke dir
     checkpointDir.delete();
     emptyDir.delete();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
index 11a084a..1af52d0 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
@@ -35,7 +35,8 @@
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
   }
 
   @Override
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
index eafa12d..fbe249a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
@@ -55,7 +55,8 @@
   public void testOnce()  throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     
     assertEquals(0, agent.adaptorCount());
 
@@ -75,7 +76,8 @@
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
 
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     for(int i=0; i < tests; ++i) {
       if(i % 100 == 0)
         System.out.println("buzzed " + i + " times");
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
index bcd940c..4bbf206 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
@@ -24,6 +24,7 @@
 import java.net.Socket;
 
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
 import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -34,8 +35,9 @@
 public class TestHeartbeatAdaptor extends TestCase {
   private volatile boolean shutdown = false;
   private final int port = 4321;
-  public void testPingAdaptor() throws IOException, InterruptedException{
+  public void testPingAdaptor() throws IOException, InterruptedException, AlreadyRunningException{
     ChukwaAgent agent = ChukwaAgent.getAgent();
+    agent.start();
     Configuration conf = agent.getConfiguration();
     conf.set("chukwa.http.writer.host", "localhost");
     conf.set("chukwa.http.writer.port", String.valueOf(port));
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
index 1e0f234..65add51 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
@@ -44,7 +44,8 @@
     
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     File testFile = makeTestFile("chukwaTest", 80,baseDir);
     String adaptorId = agent
         .processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
index 1c68a1a..fc04f25 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
@@ -36,7 +36,8 @@
     try {
       Configuration conf = new ChukwaConfiguration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
 
       FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
 
@@ -79,7 +80,8 @@
 
       Configuration conf = new ChukwaConfiguration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       // Remove any adaptor left over from previous run
 
       ChukwaAgentController cli = new ChukwaAgentController("localhost", agent.getControllerPort());
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
index 570e7f4..d622b5d 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
@@ -49,7 +49,8 @@
 		ChukwaConfiguration cc = new ChukwaConfiguration();
 		cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 18); // small in order to have hasMoreData=true
 																	 // (with 26 letters we should have 2 chunks)
-		agent = new ChukwaAgent(cc);
+		agent = ChukwaAgent.getAgent(cc);
+		agent.start();
 		
 		ChunkCatcherConnector chunks = new ChunkCatcherConnector();
 	    chunks.start();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
index 40479b5..2a82e79 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
@@ -50,7 +50,8 @@
       ChukwaConfiguration cc = new ChukwaConfiguration();
       cc.set("chukwaAgent.control.port", "0");
       cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
-      ChukwaAgent agent = new ChukwaAgent(cc);
+      ChukwaAgent agent = ChukwaAgent.getAgent(cc);
+      agent.start();
       int portno = agent.getControllerPort();
       while (portno == -1) {
         Thread.sleep(1000);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
index fbbfd94..4590ef3 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
@@ -59,7 +59,8 @@
    */
   @Before
   public void setUp() throws Exception {
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     chunks = new ChunkCatcherConnector();
     chunks.start();
 
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
index a71790d..3bda707 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
-
 import java.io.*;
 
 import junit.framework.TestCase;
@@ -36,7 +35,7 @@
   ChunkCatcherConnector chunks;
   Configuration conf = new Configuration();
   File baseDir, testFile;
-  
+
   public TestFileTailingAdaptors() throws IOException {
     chunks = new ChunkCatcherConnector();
     chunks.start();
@@ -46,13 +45,14 @@
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
     conf.set("chukwaAgent.control.port", "0");
 
-    testFile = makeTestFile("chukwaCrSepTest", 80,baseDir);
+    testFile = makeTestFile("chukwaCrSepTest", 80, baseDir);
 
   }
 
   public void testCrSepAdaptor() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     // Remove any adaptor left over from previous run
 
     // sleep for some time to make sure we don't get chunk from existing streams
@@ -84,81 +84,90 @@
     agent.shutdown();
     Thread.sleep(2000);
   }
-  
-  public void testRepeatedlyOnBigFile() throws IOException,
-  ChukwaAgent.AlreadyRunningException, InterruptedException {
-    int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
 
-    ChukwaAgent agent = new ChukwaAgent(conf);
-    for(int i=0; i < tests; ++i) {
-      if(i % 100 == 0)
+  public void testRepeatedlyOnBigFile() throws IOException,
+      ChukwaAgent.AlreadyRunningException, InterruptedException {
+    int tests = 10; // SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
+
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
+    for (int i = 0; i < tests; ++i) {
+      if (i % 100 == 0)
         System.out.println("buzzed " + i + " times");
-      
+
       assertEquals(0, agent.adaptorCount());
-      agent.processAddCommand("add adaptor_test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
+      agent
+          .processAddCommand("add adaptor_test = filetailer.FileTailingAdaptor raw "
+              + testFile.getCanonicalPath() + " 0");
       assertEquals(1, agent.adaptorCount());
       Chunk c = chunks.waitForAChunk();
       String dat = new String(c.getData());
       assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
       assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));
       assertTrue(c.getDataType().equals("raw"));
-      if(agent.adaptorCount() > 0)
+      if (agent.adaptorCount() > 0)
         agent.stopAdaptor("adaptor_test", false);
     }
     agent.shutdown();
   }
 
-  
-  public void testOffsetInAdaptorName() throws IOException, ChukwaAgent.AlreadyRunningException,
-  InterruptedException{
-    File testFile = makeTestFile("foo", 120,baseDir);
-    ChukwaAgent agent = new ChukwaAgent(conf);
+  public void testOffsetInAdaptorName() throws IOException,
+      ChukwaAgent.AlreadyRunningException, InterruptedException {
+    File testFile = makeTestFile("foo", 120, baseDir);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     assertEquals(0, agent.adaptorCount());
-    agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
+    agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw "
+        + testFile.getCanonicalPath() + " 0");
     assertEquals(1, agent.adaptorCount());
     Thread.sleep(2000);
-    agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
+    agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw "
+        + testFile.getCanonicalPath() + " 0");
     assertEquals(1, agent.adaptorCount());
     chunks.clear();
     agent.shutdown();
   }
-  
+
   /**
-   * Test that LWFTAdaptor updates lastSlurpTime so that FileTailingAdaptor
-   * does not trigger an infinite loop and that slurp() is not called by
+   * Test that LWFTAdaptor updates lastSlurpTime so that FileTailingAdaptor does
+   * not trigger an infinite loop and that slurp() is not called by
    * FileTailingAdaptor if file is not updated (see CHUKWA-668)
+   * 
    * @throws IOException
    * @throws ChukwaAgent.AlreadyRunningException
    * @throws InterruptedException
    */
-  public void testSlurpTimeUpdated() throws IOException, ChukwaAgent.AlreadyRunningException,
-  InterruptedException{
-	  ChukwaAgent agent = new ChukwaAgent(conf);
-	  File testFile = makeTestFile("fooSlurp", 0,baseDir);
-	  long startTime = System.currentTimeMillis();
-	  String adaptorId = agent.processAddCommand("add adaptor_test =" +
-	  		"filetailer.FileTailingAdaptor slurp " +testFile.getCanonicalPath() + " 0");
-	  FileTailingAdaptor fta = (FileTailingAdaptor)agent.getAdaptor( adaptorId);
-	  Thread.sleep(500);
-	  long initializedSlurpTimeValue = fta.lastSlurpTime; 
-	  assertTrue( initializedSlurpTimeValue > startTime); // initialized to current time
-	  
-	  makeTestFile("fooSlurp", 2,baseDir);
-	  Chunk c = chunks.waitForAChunk();
+  public void testSlurpTimeUpdated() throws IOException,
+      ChukwaAgent.AlreadyRunningException, InterruptedException {
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
+    File testFile = makeTestFile("fooSlurp", 0, baseDir);
+    long startTime = System.currentTimeMillis();
+    String adaptorId = agent.processAddCommand("add adaptor_test ="
+        + "filetailer.FileTailingAdaptor slurp " + testFile.getCanonicalPath()
+        + " 0");
+    FileTailingAdaptor fta = (FileTailingAdaptor) agent.getAdaptor(adaptorId);
+    Thread.sleep(500);
+    long initializedSlurpTimeValue = fta.lastSlurpTime;
+    assertTrue(initializedSlurpTimeValue > startTime); // initialized to current
+                                                       // time
 
-	  Thread.sleep(2000);
-	  // lastSlurpTime has been updated because a slurp was done
-	  long secondSlurpTimeValue = fta.lastSlurpTime;
-	  assertTrue( secondSlurpTimeValue > initializedSlurpTimeValue);
-	  assertEquals( fta.fileReadOffset, c.getData().length);
-	  assertEquals( fta.fileReadOffset, fta.reader.length());
-	  
-	  Thread.sleep(2000);
-	  // ensure we don't try to slurp if file is not updated
-	  assertEquals( fta.lastSlurpTime, secondSlurpTimeValue);
-	  
-	  if(agent.adaptorCount() > 0)
-	        agent.stopAdaptor("adaptor_test", false);
-	    agent.shutdown();
+    makeTestFile("fooSlurp", 2, baseDir);
+    Chunk c = chunks.waitForAChunk();
+
+    Thread.sleep(2000);
+    // lastSlurpTime has been updated because a slurp was done
+    long secondSlurpTimeValue = fta.lastSlurpTime;
+    assertTrue(secondSlurpTimeValue > initializedSlurpTimeValue);
+    assertEquals(fta.fileReadOffset, c.getData().length);
+    assertEquals(fta.fileReadOffset, fta.reader.length());
+
+    Thread.sleep(2000);
+    // ensure we don't try to slurp if file is not updated
+    assertEquals(fta.lastSlurpTime, secondSlurpTimeValue);
+
+    if (agent.adaptorCount() > 0)
+      agent.stopAdaptor("adaptor_test", false);
+    agent.shutdown();
   }
 }
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
index ba0e15d..6f9e2d3 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
@@ -41,6 +41,7 @@
   public void testLogRotate() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
     ChukwaAgent agent = ChukwaAgent.getAgent();
+    agent.start();
     // Remove any adaptor left over from previous run
     ChukwaConfiguration cc = new ChukwaConfiguration();
     int portno = cc.getInt("chukwaAgent.control.port", 9093);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
index 7c5c2d3..f4ec73f 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
@@ -54,7 +54,8 @@
     Configuration conf = new ChukwaConfiguration();
     conf.set("", "org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector");
     try {
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       Thread.sleep(2000);
       Map<String, String> adaptorList = agent.getAdaptorList();
       for(String id : adaptorList.keySet()) {
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
index 935ddd1..d2334d8 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
@@ -64,7 +64,8 @@
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
 
     File testFile = makeTestFile("chukwaRawTest", 80, 
         new File(System.getProperty("test.build.data", "/tmp")));
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
index a2ee740..363b1ce 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
@@ -46,7 +46,8 @@
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     File testFile = makeTestFile();
     int startOffset = 0; // skip first line
     String adaptorId = agent
@@ -81,7 +82,8 @@
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     File testFile = makeTestFile();
     int startOffset = 0;
     String adaptorId = agent
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
index 07c160c..dd5b702 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
@@ -34,7 +34,8 @@
     try {
       Configuration conf = new Configuration();
       conf.setInt("chukwaAgent.control.port", 0);
-      ChukwaAgent agent = new ChukwaAgent(conf);
+      ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
 
@@ -63,7 +64,8 @@
     try {
       Configuration conf = new Configuration();
       conf.setInt("chukwaAgent.control.port", 0);
-      ChukwaAgent agent = new ChukwaAgent(conf);
+      ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       int count = agent.adaptorCount();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
index 8f4eb73..c960b4d 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
@@ -57,7 +57,8 @@
       checkpointDir.deleteOnExit();
       conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getAbsolutePath());
 
-      ChukwaAgent agent = new ChukwaAgent(conf);
+      ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       assertEquals(1, agent.adaptorCount());// check that we processed initial
@@ -80,7 +81,8 @@
       ps.close();
 
       System.out.println("---------------------restarting");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       conn = new ConsoleOutConnector(agent, true);
       conn.start();
       assertEquals(2, agent.adaptorCount());// check that we processed initial
@@ -113,7 +115,8 @@
       conf.setInt("chukwaAgent.control.port", 0);
 
       System.out.println("\n\n===checkpoints enabled, dir does not exist:");
-      ChukwaAgent agent = new ChukwaAgent(conf);
+      ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       assertEquals(0, agent.getAdaptorList().size());
       agent.shutdown();
       Thread.sleep(2000);
@@ -123,7 +126,8 @@
 
       System.out
           .println("\n\n===checkpoints enabled, dir exists but is empty:");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       assertEquals(0, agent.getAdaptorList().size());
       agent.shutdown();
       Thread.sleep(2000);
@@ -133,7 +137,8 @@
       System.out
           .println("\n\n===checkpoints enabled, dir exists with zero-length file:");
       (new File(NONCE_DIR, "chukwa_checkpoint_0")).createNewFile();
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       assertEquals(0, agent.getAdaptorList().size());
       agent.processAddCommand("ADD org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor testdata  0");
       agent.shutdown();
@@ -142,7 +147,8 @@
 
       System.out
           .println("\n\n===checkpoints enabled, dir exists with valid checkpoint");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       assertEquals(1, agent.getAdaptorList().size());
       agent.shutdown();
       Thread.sleep(2000);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java
index 898e03c..bddbc8e 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java
@@ -86,7 +86,8 @@
     conf.set(SSL_PROTOCOL, sslProtocol);
     */
     //start agent, which starts chukwa rest server
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     System.out.println("Started ChukwaRestServer");
     testSecureRestAdaptor(agent);
     agent.shutdown();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
index 66b85e7..051163b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
@@ -45,9 +45,9 @@
   
   @Override
   protected void setUp() throws AlreadyRunningException {
-    agent = ChukwaAgent.getAgent();
     if(agent == null){
-      agent = new ChukwaAgent();
+      agent = ChukwaAgent.getAgent();
+      agent.start();
     }
     conf = agent.getConfiguration();
     conf.set(CHUNK_QUEUE_LIMIT, Integer.toString(QUEUE_LIMIT));
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
index 6880fce..a34d221 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
@@ -36,7 +36,8 @@
     try {
       Configuration conf = new Configuration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       String l = agent
@@ -68,7 +69,8 @@
     try {
       Configuration conf = new Configuration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       String name = agent
@@ -99,7 +101,8 @@
     try {
       Configuration conf = new Configuration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       String n = agent
@@ -129,7 +132,8 @@
   public void testStopAll() throws Exception{
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     ChunkCatcherConnector chunks = new ChunkCatcherConnector();
     chunks.start();
     agent.processAddCommand(
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
index 0d5bbd0..fa2d5f1 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
@@ -56,6 +56,7 @@
 
   protected void setUp() throws Exception {
     agent = ChukwaAgent.getAgent();
+    agent.start();
 
     ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
     servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
index a3ed9f6..2cfd72c 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
@@ -46,7 +46,8 @@
     Server collectorServ = TestDelayedAcks.startCollectorOnPort(conf, PORTNO, collector);
     Thread.sleep(1000);
     
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
     conn.start();
     String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
index b0d4d1c..6423fce 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
@@ -56,7 +56,8 @@
     conf.setInt("constAdaptor.minSleep", 50);
     
     conf.setInt("chukwaAgent.control.port", 0);
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     RetryListOfCollectors clist = new RetryListOfCollectors(conf);
     clist.add("http://localhost:"+PORTNO+"/chukwa");
     HttpConnector conn = new HttpConnector(agent);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
index eff7660..b067f16 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
@@ -73,7 +73,8 @@
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 500);
     conf.setInt(AdaptorResetThread.TIMEOUT_OPT, ACK_TIMEOUT);
 
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     ChunkCatcherConnector chunks = new ChunkCatcherConnector();
     chunks.start();
     assertEquals(0, agent.adaptorCount());
@@ -207,7 +208,8 @@
       Server collectorServ = startCollectorOnPort(conf, PORTNO, collector);
       Thread.sleep(1000);
       
-      ChukwaAgent agent = new ChukwaAgent(conf);
+      ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
       conn.start();
       String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
index f5f8d26..de34563 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
@@ -66,7 +66,8 @@
     Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
     Thread.sleep(2000); //for collectors to start
     
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     HttpConnector conn = new HttpConnector(agent);
     RetryListOfCollectors clist = new RetryListOfCollectors(conf);
     clist.add("http://localhost:"+(PORTNO+1)+"/");
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java
index 45e29d3..7eaaf05 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java
@@ -38,7 +38,8 @@
 
   protected void setUp() throws ChukwaAgent.AlreadyRunningException {
     config = new Configuration();
-    agent = new ChukwaAgent(config);
+    agent = ChukwaAgent.getAgent(config);
+    agent.start();
     c = new ChukwaAgentController();
     connector = new ChunkCatcherConnector();
     connector.start();
diff --git a/src/test/java/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java b/src/test/java/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java
deleted file mode 100644
index a0ca055..0000000
--- a/src/test/java/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.validationframework;
-
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
-import org.apache.hadoop.chukwa.datacollection.collector.CollectorStub;
-import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-import org.apache.hadoop.chukwa.validationframework.interceptor.ChunkDumper;
-import org.apache.hadoop.chukwa.validationframework.interceptor.SetupTestClasses;
-import org.apache.hadoop.chukwa.validationframework.util.DataOperations;
-
-public class ChukwaAgentToCollectorValidator {
-  public static final int ADD = 100;
-  public static final int VALIDATE = 200;
-
-  private static void usage() {
-    System.out.println("usage ...");
-    System.exit(-1);
-  }
-
-  /**
-   * @param args
-   * @throws Throwable
-   * @throws AlreadyRunningException
-   * @throws IOException
-   */
-  public static void main(String[] args) throws Throwable {
-    if (args.length != 2) {
-      usage();
-    }
-
-    int command = -1;
-
-    if ("-add".equalsIgnoreCase(args[0])) {
-      command = ChukwaAgentToCollectorValidator.ADD;
-    } else if ("-validate".equalsIgnoreCase(args[0])) {
-      command = ChukwaAgentToCollectorValidator.VALIDATE;
-    } else {
-      usage();
-    }
-
-    String chukwaTestRepository = System.getenv("chukwaTestRepository");
-    if (chukwaTestRepository == null) {
-      chukwaTestRepository = "/tmp/chukwaTestRepository/";
-    }
-
-    if (!chukwaTestRepository.endsWith("/")) {
-      chukwaTestRepository += "/";
-    }
-
-    String fileName = args[1];
-
-    String name = null;
-    if (fileName.indexOf("/") >= 0) {
-      name = fileName.substring(fileName.lastIndexOf("/"));
-    } else {
-      name = fileName;
-    }
-
-    String chukwaTestDirectory = chukwaTestRepository + name;
-    String inputFile = chukwaTestDirectory + "/input/" + name;
-    String outputDir = null;
-
-    if (command == ChukwaAgentToCollectorValidator.ADD) {
-      File dir = new File(chukwaTestDirectory + "/input/");
-      if (dir.exists()) {
-        throw new RuntimeException(
-            "a test with the same input file is already there, remove it first");
-      }
-      dir.mkdirs();
-      DataOperations.copyFile(fileName, inputFile);
-      outputDir = "/gold";
-    } else {
-      outputDir = "/" + System.currentTimeMillis();
-    }
-
-    System.out.println("chukwaTestDirectory [" + chukwaTestDirectory + "]");
-    System.out.println("command ["
-        + ((command == ChukwaAgentToCollectorValidator.ADD) ? "ADD"
-            : "VALIDATE") + "]");
-    System.out.println("fileName [" + inputFile + "]");
-
-    ChukwaConfiguration conf = new ChukwaConfiguration(true);
-    String collectorOutputDir = conf.get("chukwaCollector.outputDir");
-
-    prepareAndSendData(chukwaTestDirectory + outputDir, inputFile,
-        collectorOutputDir);
-    extractRawLog(chukwaTestDirectory + outputDir, name, collectorOutputDir);
-    boolean rawLogTestResult = validateRawLogs(chukwaTestDirectory + outputDir,
-        name);
-
-    boolean binLogTestResult = true;
-
-    if (command == ChukwaAgentToCollectorValidator.VALIDATE) {
-      binLogTestResult = validateOutputs(chukwaTestDirectory + outputDir, name);
-    }
-
-    if (rawLogTestResult == true && binLogTestResult == true) {
-      System.out.println("test OK");
-      System.exit(10);
-    } else {
-      System.out.println("test KO");
-      throw new RuntimeException("test failed for file [" + name + "]");
-    }
-  }
-
-  public static void prepareAndSendData(String dataRootFolder,
-      String inputFile, String dataSinkDirectory) throws Throwable {
-
-    ChunkDumper.testRepositoryDumpDir = dataRootFolder + "/";
-
-    SetupTestClasses.setupClasses();
-
-    // clean up the collector outputDir.
-    File collectorDir = new File(dataSinkDirectory);
-    String[] files = collectorDir.list();
-    for (String f : files) {
-      File file = new File(dataSinkDirectory + File.separator + f);
-      file.delete();
-      System.out.println("Deleting previous collectors files: " + f);
-    }
-
-    System.out.println("Starting agent");
-    String[] agentArgs = new String[0];
-    ChukwaAgent.main(agentArgs);
-
-    // Start the collector
-    System.out.println("Starting collector");
-    CollectorStub.main(new String[0]);
-
-    // Start the agent
-    ChukwaAgent agent = ChukwaAgent.getAgent();
-
-    int portno = 9093; // Default
-    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
-    // ADD
-    // org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.
-    // CharFileTailingAdaptorUTF8NewLineEscaped
-    // SysLog
-    // 0 /var/log/messages
-    // 0
-    System.out.println("Adding adaptor");
-    String adaptor = cli.add(
-            "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
-            "AutomatedTestType", "0 " + inputFile, 0);
-
-    cli.remove(adaptor);
-    System.out.println("Adaptor removed");
-    agent.shutdown();
-    System.out.println("Shutting down agent");
-    CollectorStub.jettyServer.stop();
-    System.out.println("Shutting down collector");
-    Thread.sleep(2000);
-  }
-
-  public static void extractRawLog(String dataRootFolder, String fileName,
-      String dataSinkDirectory) throws Exception {
-    // Adaptor output
-    DataOperations
-        .extractRawLogFromDump(dataRootFolder + "/adaptor/", fileName);
-    // Sender output
-    DataOperations.extractRawLogFromDump(dataRootFolder + "/sender/", fileName);
-
-    // Collector output
-    File dir = new File(dataRootFolder + "/collector/");
-    dir.mkdirs();
-
-    File dataSinkDir = new File(dataSinkDirectory);
-    String[] doneFiles = dataSinkDir.list();
-    // Move done file to the final directory
-    for (String f : doneFiles) {
-      String outputFile = null;
-      if (f.endsWith(".done")) {
-        outputFile = fileName + ".done";
-      } else {
-        outputFile = fileName + ".crc";
-      }
-      System.out.println("Moving that file [" + dataSinkDirectory
-          + File.separator + f + "] to [" + dataRootFolder + "/collector/"
-          + outputFile + "]");
-      DataOperations.copyFile(dataSinkDirectory + File.separator + f,
-          dataRootFolder + "/collector/" + outputFile);
-    }
-
-    DataOperations.extractRawLogFromdataSink(ChunkDumper.testRepositoryDumpDir
-        + "/collector/", fileName);
-  }
-
-  public static boolean validateRawLogs(String dataRootFolder, String fileName) {
-    boolean result = true;
-    // Validate Adaptor
-    boolean adaptorMD5 = DataOperations.validateMD5(dataRootFolder
-        + "/../input/" + fileName, dataRootFolder + "/adaptor/" + fileName
-        + ".raw");
-    if (!adaptorMD5) {
-      System.out.println("Adaptor validation failed");
-      result = false;
-    }
-    // Validate Sender
-    boolean senderMD5 = DataOperations.validateMD5(dataRootFolder
-        + "/../input/" + fileName, dataRootFolder + "/sender/" + fileName
-        + ".raw");
-    if (!senderMD5) {
-      System.out.println("Sender validation failed");
-      result = false;
-    }
-    // Validate DataSink
-    boolean collectorMD5 = DataOperations.validateMD5(dataRootFolder
-        + "/../input/" + fileName, dataRootFolder + "/collector/" + fileName
-        + ".raw");
-    if (!collectorMD5) {
-      System.out.println("collector validation failed");
-      result = false;
-    }
-
-    return result;
-  }
-
-  public static boolean validateOutputs(String dataRootFolder, String fileName) {
-    boolean result = true;
-    // Validate Adaptor
-    boolean adaptorMD5 = DataOperations.validateMD5(dataRootFolder
-        + "/../gold/adaptor/" + fileName + ".bin", dataRootFolder + "/adaptor/"
-        + fileName + ".bin");
-    if (!adaptorMD5) {
-      System.out.println("Adaptor bin validation failed");
-      result = false;
-    }
-    // Validate Sender
-    boolean senderMD5 = DataOperations.validateMD5(dataRootFolder
-        + "/../gold/sender/" + fileName + ".bin", dataRootFolder + "/sender/"
-        + fileName + ".bin");
-    if (!senderMD5) {
-      System.out.println("Sender bin validation failed");
-      result = false;
-    }
-    // Validate DataSink
-    // boolean collectorMD5 = DataOperations.validateRawLog(dataRootFolder +
-    // "/../gold/collector/" + fileName + ".done", dataRootFolder +
-    // "/collector/" + fileName + ".done");
-    // if (!collectorMD5)
-    // {
-    // System.out.println("collector bin validation failed");
-    // result = false;
-    // }
-
-    return result;
-  }
-}
\ No newline at end of file