CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8192ffd..e23338b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,8 @@
BUGS
+ CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)
+
CHUKWA-770. Moved default dashboard population code to login.jsp. (Eric Yang)
CHUKWA-766. Updated license on source files. (Eric Yang)
diff --git a/pom.xml b/pom.xml
index ad931c8..82a7f03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -465,9 +465,9 @@
<goals>
<goal>compile</goal>
</goals>
- <compilerVersion>1.6</compilerVersion>
- <source>1.6</source>
- <target>1.6</target>
+ <compilerVersion>1.7</compilerVersion>
+ <source>1.7</source>
+ <target>1.7</target>
<excludes>
<exclude>**/ChukwaJobTrackerInstrumentation.java</exclude>
</excludes>
@@ -1360,7 +1360,7 @@
</plugin>
<plugin>
<artifactId>maven-pmd-plugin</artifactId>
- <version>2.6</version>
+ <version>3.4</version>
<reportSets>
<reportSet>
<reports>
@@ -1370,13 +1370,13 @@
</reportSet>
</reportSets>
<configuration>
- <targetJdk>1.6</targetJdk>
+ <targetJdk>1.7</targetJdk>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
- <version>2.3.3</version>
+ <version>3.0.1</version>
<configuration>
<threshold>Normal</threshold>
<effort>Max</effort>
diff --git a/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java b/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java
index 1f184c7..ed90d4a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java
+++ b/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -31,7 +32,7 @@
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
- public static int PROTOCOL_VERSION = 1;
+ public final static int PROTOCOL_VERSION = 1;
protected DataFactory dataFactory = DataFactory.getInstance();
private String source = "";
@@ -249,9 +250,15 @@
return w;
}
- // FIXME: should do something better here, but this is OK for debugging
public String toString() {
- return source + ":" + streamName + ":" + new String(data) + "/" + seqID;
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(source);
+ buffer.append(":");
+ buffer.append(streamName);
+ buffer.append(new String(data, Charset.forName("UTF-8")));
+ buffer.append("/");
+ buffer.append(seqID);
+ return buffer.toString();
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
index 7cb10ec..ec37f7a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
@@ -45,7 +45,7 @@
{
private static Log log = LogFactory.getLog(FSMBuilder.class);
protected static final String SEP = "/";
- protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
+ protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
private final Pattern ipPattern =
Pattern.compile(".*[a-zA-Z\\-_:\\/]([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z0-9\\-_:\\/].*");
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
index de28597..d3a1656 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
@@ -75,13 +75,13 @@
* These are used for the add_info TreeMap; keys not listed here are automatically
* prepended with "COUNTER_"
*/
- protected static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
+ final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
- protected static String JCDF_ID1 = "JCDF_ID1";
- protected static String JCDF_ID2 = "JCDF_ID2";
- protected static String JCDF_EDGE_TIME = "JCDF_E_TIME";
- protected static String JCDF_EDGE_VOL = "JCDF_E_VOL";
- protected static String JCDF_SEP = "@";
+ protected final static String JCDF_ID1 = "JCDF_ID1";
+ protected final static String JCDF_ID2 = "JCDF_ID2";
+ protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME";
+ protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL";
+ protected final static String JCDF_SEP = "@";
/**
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
index fddfeb1..234f8bb 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
@@ -23,9 +23,9 @@
public static final int FILESYSTEM_FSM = 1;
public static final int MAPREDUCE_FSM_INCOMPLETE = 2;
public static final int FILESYSTEM_FSM_INCOMPLETE = 3;
- public static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" };
+ static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" };
public FSMType() { this.val = 0; }
public FSMType(int newval) { this.val = newval; }
public int val;
- public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }
-}
\ No newline at end of file
+ public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
index a2fe353..96e1bd5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
@@ -25,9 +25,9 @@
public static final int WRITE_LOCAL = 3;
public static final int WRITE_REMOTE = 4;
public static final int WRITE_REPLICATED = 5;
- public static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"};
+ static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"};
public HDFSState() { this.val = 1; }
public HDFSState(int newval) { this.val = newval; }
public int val;
- public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }
-}
\ No newline at end of file
+ public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
index 883225a..3de268e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
@@ -47,7 +47,7 @@
private static Log log = LogFactory.getLog(FSMBuilder.class);
protected static final String SEP = "/";
- protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
+ protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
/*
* Helper function for mapper to populate TreeMap of FSMIntermedEntr
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
index ae06be5..f059049 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
@@ -27,10 +27,10 @@
public static final int REDUCE_REDUCER = 5;
public static final int SHUFFLE_LOCAL = 6;
public static final int SHUFFLE_REMOTE = 7;
- public static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT",
+ static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT",
"REDUCE_SORT", "REDUCE_REDUCER", "SHUFFLE_LOCAL", "SHUFFLE_REMOTE"};
public MapRedState() { this.val = 0; }
public MapRedState(int newval) { this.val = newval; }
public int val;
- public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }
-}
\ No newline at end of file
+ public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
index 56e8362..7298a5c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
@@ -23,9 +23,9 @@
public static final int STATE_START = 1;
public static final int STATE_END = 2;
public static final int STATE_INSTANT = 3;
- public static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"};
+ static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"};
public StateType() { this.val = 0; }
public StateType(int newval) { this.val = newval; }
public int val;
- public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }
-}
\ No newline at end of file
+ public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); }
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
index 785ba8b..188b076 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
@@ -46,7 +46,7 @@
{
private static Log log = LogFactory.getLog(FSMBuilder.class);
protected static final String SEP = "/";
- protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
+ protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
private final Pattern ipPattern =
Pattern.compile("([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z\\-_:\\/].*");
@@ -149,8 +149,8 @@
String [] k = key.getKey().split("/");
start_rec.time_orig_epoch = k[0];
- start_rec.time_orig = (new Long(actual_time_ms)).toString(); // not actually used
- start_rec.timestamp = (new Long(actual_time_ms)).toString();
+ start_rec.time_orig = (Long.valueOf(actual_time_ms)).toString(); // not actually used
+ start_rec.timestamp = (Long.valueOf(actual_time_ms)).toString();
start_rec.time_end = new String("");
start_rec.time_start = new String(start_rec.timestamp);
diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
index 36b5e5e..87bf40c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
+++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
@@ -317,7 +317,7 @@
return this.states.length;
}
public String [] getStates() {
- return this.states;
+ return this.states.clone();
}
}
@@ -903,4 +903,4 @@
return rs_tab;
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java
index f92c9e9..1aa50af 100644
--- a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java
+++ b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java
@@ -20,12 +20,16 @@
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
@@ -48,7 +52,7 @@
public static String getContents(File aFile) {
StringBuffer contents = new StringBuffer();
try {
- BufferedReader input = new BufferedReader(new FileReader(aFile));
+ BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
try {
String line = null; // not declared within while loop
while ((line = input.readLine()) != null) {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
index ba68d73..1a0e2a3 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
@@ -26,6 +26,7 @@
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
+import java.nio.charset.Charset;
import java.util.*;
/**
@@ -87,7 +88,7 @@
result.append(o.get("exitValue"));
result.append(": ");
result.append((String) o.get("stdout"));
- data = result.toString().getBytes();
+ data = result.toString().getBytes(Charset.forName("UTF-8"));
} else {
String stdout = (String) o.get("stdout");
data = stdout.getBytes();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
index c2792a7..313abc8 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java
@@ -20,6 +20,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -72,7 +73,7 @@
status.put("components", array);
if(_shouldUseConnector){
ChunkImpl chunk = new ChunkImpl(type, STREAM_NAME, seqId, status.toString()
- .getBytes(), HeartbeatAdaptor.this);
+ .getBytes(Charset.forName("UTF-8")), HeartbeatAdaptor.this);
dest.add(chunk);
} else {
sendDirectly(status.toString());
@@ -83,7 +84,7 @@
private void sendDirectly(String data) {
DataOutputStream dos = null;
Socket sock = null;
- byte[] bdata = data.getBytes();
+ byte[] bdata = data.getBytes(Charset.forName("UTF-8"));
try {
sock = new Socket(_host, _port);
dos = new DataOutputStream(sock.getOutputStream());
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
index 792957c..c07f6fa 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
@@ -20,8 +20,11 @@
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
import java.rmi.ConnectException;
import java.util.Calendar;
import java.util.HashMap;
@@ -97,11 +100,11 @@
sb.append(File.separator);
}
sb.append("jmxremote.password");
- String jmx_pw_file = sb.toString();
+ File jmx_pw_file = new File(sb.toString());
shutdown = false;
while(!shutdown){
try{
- BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file));
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8")));
String[] creds = br.readLine().split(" ");
Map<String, String[]> env = new HashMap<String, String[]>();
env.put(JMXConnector.CREDENTIALS, creds);
@@ -202,7 +205,7 @@
}
}
- byte[] data = json.toString().getBytes();
+ byte[] data = json.toString().getBytes(Charset.forName("UTF-8"));
sendOffset+=data.length;
ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
index b401f2e..39af580 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.Calendar;
import java.util.TimeZone;
@@ -164,7 +165,7 @@
}
private int processMetrics() {
- return addChunkToReceiver(getOozieMetrics().getBytes());
+ return addChunkToReceiver(getOozieMetrics().getBytes(Charset.forName("UTF-8")));
}
private String getOozieMetrics() {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
index 60ac50d..8e3fd8a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
@@ -19,9 +19,9 @@
package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.io.FileInputStream;
+import java.nio.charset.Charset;
import java.security.KeyStore;
import java.security.SecureRandom;
-
import java.util.Calendar;
import java.util.TimeZone;
import java.util.Timer;
@@ -33,6 +33,7 @@
import org.apache.log4j.Logger;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
+
import static org.apache.hadoop.chukwa.datacollection.agent.ChukwaConstants.*;
import com.sun.jersey.api.client.Client;
@@ -44,7 +45,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
-
import javax.ws.rs.core.MediaType;
public class RestAdaptor extends AbstractAdaptor {
@@ -75,7 +75,7 @@
resource = c.resource(uri);
bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
String.class);
- byte[] data = bean.getBytes();
+ byte[] data = bean.getBytes(Charset.forName("UTF-8"));
sendOffset += data.length;
ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
index 705d310..b37be9c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
@@ -22,6 +22,7 @@
import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.net.*;
+import java.nio.charset.Charset;
import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -132,7 +133,7 @@
while(running) {
// read an event from the wire
event = (LoggingEvent) ois.readObject();
- byte[] bytes = layout.format(event).getBytes();
+ byte[] bytes = layout.format(event).getBytes(Charset.forName("UTF-8"));
bytesReceived=bytes.length;
Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this);
dest.add(c);
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
index 258cdb5..07f6c66 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.*;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
@@ -63,7 +64,7 @@
source.append(dp.getAddress());
String dataType = type;
byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength());
- String rawPRI = new String(trimmedBuf, 1, 4);
+ String rawPRI = new String(trimmedBuf, 1, 4, Charset.forName("UTF-8"));
int i = rawPRI.indexOf(">");
if (i <= 3 && i > -1) {
String priorityStr = rawPRI.substring(0,i);
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
index 4e5fcdf..5fea073 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
@@ -34,7 +34,7 @@
public static int MAX_RETRIES = 300;
- public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
+ static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
private int attempts = 0;
private long gracefulPeriodExpired = 0l;
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
index 1689e0e..9da09d5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
@@ -51,10 +51,10 @@
public static final String MAX_READ_SIZE_OPT =
"chukwaAgent.fileTailingAdaptor.maxReadSize";
- public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+ static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
static Logger log;
- protected static FileTailer tailer;
+ static FileTailer tailer;
static {
tailer = null;
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
index 2ac669e..b0ef917 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
@@ -22,6 +22,8 @@
import javax.jms.Message;
import javax.jms.JMSException;
+
+import java.nio.charset.Charset;
import java.util.ArrayList;
/**
@@ -141,7 +143,7 @@
return null;
}
- return sb.toString().getBytes();
+ return sb.toString().getBytes(Charset.forName("UTF-8"));
}
/**
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java
index d23675e..5aed762 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.sigar;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.TimerTask;
@@ -203,13 +204,13 @@
json.put("disk", fsList);
}
json.put("timestamp", System.currentTimeMillis());
- byte[] data = json.toString().getBytes();
+ byte[] data = json.toString().getBytes(Charset.forName("UTF-8"));
sendOffset += data.length;
ChunkImpl c = new ChunkImpl("SystemMetrics", "Sigar", sendOffset, data, systemMetrics);
if(!skip) {
receiver.add(c);
}
- } catch (Exception se) {
+ } catch (InterruptedException se) {
log.error(ExceptionUtil.getStackTrace(se));
}
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
index 1160fd3..dda7888 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
@@ -26,7 +26,9 @@
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.*;
+import java.nio.charset.Charset;
import java.util.Map;
+
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
import org.apache.log4j.Logger;
@@ -70,7 +72,7 @@
public void run() {
try {
InputStream in = connection.getInputStream();
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
PrintStream out = new PrintStream(new BufferedOutputStream(connection
.getOutputStream()));
String cmd = null;
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
index 7dad2d7..4a2e996 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
@@ -30,6 +30,7 @@
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -71,59 +72,79 @@
// boolean WRITE_CHECKPOINTS = true;
static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
- private static Logger log = Logger.getLogger(ChukwaAgent.class);
- private OffsetStatsManager adaptorStatsManager = null;
+ private final static Logger log = Logger.getLogger(ChukwaAgent.class);
+ private OffsetStatsManager<Adaptor> adaptorStatsManager = null;
private Timer statsCollector = null;
- private static volatile Configuration conf = null;
- private static volatile ChukwaAgent agent = null;
+ private static Configuration conf = null;
+ private volatile static ChukwaAgent agent = null;
public Connector connector = null;
+ private boolean stopped = false;
- protected ChukwaAgent() throws AlreadyRunningException {
- this(new ChukwaConfiguration());
+ private ChukwaAgent() {
+ agent = new ChukwaAgent(new ChukwaConfiguration());
}
- public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
- ChukwaAgent.agent = this;
+ private ChukwaAgent(Configuration conf) {
+ agent = this;
ChukwaAgent.conf = conf;
-
// almost always just reading this; so use a ConcurrentHM.
// since we wrapped the offset, it's not a structural mod.
adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
adaptorsByName = new HashMap<String, Adaptor>();
checkpointNumber = 0;
+ stopped = false;
+ }
- boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
+ public static ChukwaAgent getAgent() {
+ if(agent == null || agent.isStopped()) {
+ agent = new ChukwaAgent();
+ }
+ return agent;
+ }
+
+ public static ChukwaAgent getAgent(Configuration conf) {
+ if(agent == null || agent.isStopped()) {
+ agent = new ChukwaAgent(conf);
+ }
+ return agent;
+ }
+
+ public void start() throws AlreadyRunningException {
+ boolean checkPointRestore = conf.getBoolean(
"chukwaAgent.checkpoint.enabled", true);
- CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+ checkPointBaseName = conf.get("chukwaAgent.checkpoint.name",
"chukwa_checkpoint_");
- final int CHECKPOINT_INTERVAL_MS = conf.getInt(
+ final int checkPointIntervalMs = conf.getInt(
"chukwaAgent.checkpoint.interval", 5000);
- final int STATS_INTERVAL_MS = conf.getInt(
+ final int statsIntervalMs = conf.getInt(
"chukwaAgent.stats.collection.interval", 10000);
- final int STATS_DATA_TTL_MS = conf.getInt(
+ int statsDataTTLMs = conf.getInt(
"chukwaAgent.stats.data.ttl", 1200000);
if (conf.get("chukwaAgent.checkpoint.dir") != null)
checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
else
- DO_CHECKPOINT_RESTORE = false;
+ checkPointRestore = false;
if (checkpointDir != null && !checkpointDir.exists()) {
- checkpointDir.mkdirs();
+ boolean result = checkpointDir.mkdirs();
+ if(!result) {
+ log.error("Failed to create check point directory.");
+ }
}
- tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+ String tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
- log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+ log.info("Config - CHECKPOINT_BASE_NAME: [" + checkPointBaseName + "]");
log.info("Config - checkpointDir: [" + checkpointDir + "]");
- log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+ log.info("Config - CHECKPOINT_INTERVAL_MS: [" + checkPointIntervalMs
+ "]");
- log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
- log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
+ log.info("Config - DO_CHECKPOINT_RESTORE: [" + checkPointRestore + "]");
+ log.info("Config - STATS_INTERVAL_MS: [" + statsIntervalMs + "]");
log.info("Config - tags: [" + tags + "]");
- if (DO_CHECKPOINT_RESTORE) {
- log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+ if (checkPointRestore) {
+ log.info("checkpoints are enabled, period is " + checkPointIntervalMs);
}
File initialAdaptors = null;
@@ -131,7 +152,7 @@
initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
try {
- if (DO_CHECKPOINT_RESTORE) {
+ if (checkPointRestore) {
restoreFromCheckpoint();
}
} catch (IOException e) {
@@ -152,44 +173,31 @@
// another agent is running.
controlSock.start(); // this sets us up as a daemon
log.info("control socket started on port " + controlSock.portno);
-
- // start the HTTP server with stats collection
- try {
- this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
- this.statsCollector = new Timer("ChukwaAgent Stats Collector");
-
- startHttpServer(conf);
-
- statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
- STATS_INTERVAL_MS, STATS_INTERVAL_MS);
- } catch (Exception e) {
- log.error("Couldn't start HTTP server", e);
- throw new RuntimeException(e);
- }
-
- // shouldn't start checkpointing until we're finishing launching
- // adaptors on boot
- if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
- checkpointer = new Timer();
- checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
- }
} catch (IOException e) {
log.info("failed to bind to socket; aborting agent launch", e);
throw new AlreadyRunningException();
}
- }
+ // start the HTTP server with stats collection
+ try {
+ adaptorStatsManager = new OffsetStatsManager<Adaptor>(statsDataTTLMs);
+ statsCollector = new Timer("ChukwaAgent Stats Collector");
- public static ChukwaAgent getAgent() {
- if(agent == null) {
- try {
- agent = new ChukwaAgent();
- } catch(AlreadyRunningException e) {
- log.error("Chukwa Agent is already running", e);
- agent = null;
- }
- }
- return agent;
+ startHttpServer(conf);
+
+ statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
+ statsIntervalMs, statsIntervalMs);
+ } catch (Exception e) {
+ log.error("Couldn't start HTTP server", e);
+ throw new RuntimeException(e);
+ }
+
+ // shouldn't start check pointing until we're finishing launching
+ // adaptors on boot
+ if (checkPointIntervalMs > 0 && checkpointDir != null) {
+ checkpointer = new Timer();
+ checkpointer.schedule(new CheckpointTask(), 0, checkPointIntervalMs);
+ }
}
// doesn't need an equals(), comparator, etc
@@ -219,17 +227,16 @@
}
}
- private final Map<Adaptor, Offset> adaptorPositions;
+ private static Map<Adaptor, Offset> adaptorPositions;
// basically only used by the control socket thread.
//must be locked before access
- private final Map<String, Adaptor> adaptorsByName;
+ private static Map<String, Adaptor> adaptorsByName;
private File checkpointDir; // lock this object to indicate checkpoint in
// progress
- private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
+ private String checkPointBaseName; // base filename for checkpoint files
// checkpoints
- private static String tags = "";
private Timer checkpointer;
private volatile boolean needNewCheckpoint = false; // set to true if any
@@ -238,13 +245,13 @@
private int checkpointNumber; // id number of next checkpoint.
// should be protected by grabbing lock on checkpointDir
- private final AgentControlSocketListener controlSock;
+ private AgentControlSocketListener controlSock;
public int getControllerPort() {
return controlSock.getPort();
}
- public OffsetStatsManager getAdaptorStatsManager() {
+ public OffsetStatsManager<Adaptor> getAdaptorStatsManager() {
return adaptorStatsManager;
}
@@ -261,12 +268,10 @@
System.exit(0);
}
- conf = ChukwaUtil.readConfiguration();
- agent = new ChukwaAgent(conf);
-
+ Configuration conf = ChukwaUtil.readConfiguration();
+ agent = ChukwaAgent.getAgent(conf);
if (agent.anotherAgentIsRunning()) {
- System.out
- .println("another agent is running (or port has been usurped). "
+ log.error("another agent is running (or port has been usurped). "
+ "Bailing out now");
throw new AlreadyRunningException();
}
@@ -286,11 +291,12 @@
"org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
agent.connector = (Connector) Class.forName(connectorType).newInstance();
}
+ agent.start();
agent.connector.start();
log.info("local agent started on port " + agent.getControlSock().portno);
- //System.out.close();
- //System.err.close();
+ System.out.close();
+ System.err.close();
} catch (AlreadyRunningException e) {
log.error("agent started already on this machine with same portno;"
+ " bailing out");
@@ -304,7 +310,11 @@
}
private boolean anotherAgentIsRunning() {
- return !controlSock.isBound();
+ boolean result = false;
+ if(controlSock!=null) {
+ result = !controlSock.isBound();
+ }
+ return result;
}
/**
@@ -475,30 +485,26 @@
synchronized (checkpointDir) {
String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
public boolean accept(File dir, String name) {
- return name.startsWith(CHECKPOINT_BASE_NAME);
+ return name.startsWith(checkPointBaseName);
}
});
if (checkpointNames == null) {
log.error("Unable to list files in checkpoint dir");
return false;
- }
- if (checkpointNames.length == 0) {
+ } else if (checkpointNames.length == 0) {
log.info("No checkpoints found in " + checkpointDir);
return false;
- }
-
- if (checkpointNames.length > 2)
+ } else if (checkpointNames.length > 2) {
log.warn("expected at most two checkpoint files in " + checkpointDir
+ "; saw " + checkpointNames.length);
- else if (checkpointNames.length == 0)
- return false;
+ }
String lowestName = null;
int lowestIndex = Integer.MAX_VALUE;
for (String n : checkpointNames) {
int index = Integer
- .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
+ .parseInt(n.substring(checkPointBaseName.length()));
if (index < lowestIndex) {
lowestName = n;
lowestIndex = index;
@@ -516,7 +522,7 @@
IOException {
log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
BufferedReader br = new BufferedReader(new InputStreamReader(
- new FileInputStream(checkpoint)));
+ new FileInputStream(checkpoint), Charset.forName("UTF-8")));
String cmd = null;
while ((cmd = br.readLine()) != null)
processAddCommand(cmd);
@@ -534,20 +540,23 @@
log.info("writing checkpoint " + checkpointNumber);
FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
- CHECKPOINT_BASE_NAME + checkpointNumber));
+ checkPointBaseName + checkpointNumber));
PrintWriter out = new PrintWriter(new BufferedWriter(
- new OutputStreamWriter(fos)));
+ new OutputStreamWriter(fos, Charset.forName("UTF-8"))));
for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
}
out.close();
- File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
+ File lastCheckpoint = new File(checkpointDir, checkPointBaseName
+ (checkpointNumber - 1));
log.debug("hopefully removing old checkpoint file "
+ lastCheckpoint.getAbsolutePath());
- lastCheckpoint.delete();
+ boolean result = lastCheckpoint.delete();
+ if(!result) {
+ log.warn("Unable to delete lastCheckpoint file: "+lastCheckpoint.getAbsolutePath());
+ }
checkpointNumber++;
}
}
@@ -729,11 +738,27 @@
adaptorsByName.clear();
adaptorPositions.clear();
adaptorStatsManager.clear();
+ agent.stop();
if (exit)
System.exit(0);
}
/**
+ * Set agent into stop state.
+ */
+ private void stop() {
+ stopped = true;
+ }
+
+ /**
+ * Check if agent is in stop state.
+ * @return true if agent is in stop state.
+ */
+ private boolean isStopped() {
+ return stopped;
+ }
+
+ /**
* Returns the control socket for this agent.
*/
private AgentControlSocketListener getControlSock() {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java
index cbabbc0..f549614 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java
@@ -41,33 +41,24 @@
private static ChukwaRestServer instance = null;
- public static void startInstance(Configuration conf) throws Exception{
+ public static synchronized void startInstance(Configuration conf) throws Exception{
if(instance == null){
- synchronized(ChukwaRestServer.class) {
- if(instance == null){
- instance = new ChukwaRestServer(conf);
- instance.start();
- }
- }
+ instance = new ChukwaRestServer(conf);
+ instance.start();
}
}
-
- public static void stopInstance() throws Exception {
+
+ public static synchronized void stopInstance() throws Exception {
if(instance != null) {
- synchronized(ChukwaRestServer.class) {
- if(instance != null){
- instance.stop();
- instance = null;
- }
- }
+ instance.stop();
+ instance = null;
}
-
}
-
+
private ChukwaRestServer(Configuration conf){
this.conf = conf;
}
-
+
private void start() throws Exception{
int portNum = conf.getInt(AGENT_HTTP_PORT, 9090);
String jaxRsAddlPackages = conf.get(AGENT_REST_CONTROLLER_PACKAGES);
@@ -127,7 +118,7 @@
log.info("started Chukwa http agent interface on port " + portNum);
}
-
+
private void stop() throws Exception{
jettyServer.stop();
log.info("Successfully stopped Chukwa http agent interface");
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java
index 5d3f71a..b7c912b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java
@@ -23,13 +23,13 @@
public class Examples {
public static final AdaptorConfig CREATE_ADAPTOR_SAMPLE = new AdaptorConfig();
public static final AdaptorInfo ADAPTOR_STATUS_SAMPLE = new AdaptorInfo();
- public static final List<AdaptorAveragedRate> ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
+ final static List<AdaptorAveragedRate> ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_MINUTE = new AdaptorAveragedRate();
public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_FIVE_MINUTE = new AdaptorAveragedRate();
public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_TEN_MINUTE = new AdaptorAveragedRate();
public static final AdaptorInfo SYS_ADAPTOR_STATUS_SAMPLE = new AdaptorInfo();
- public static final List<AdaptorAveragedRate> SYS_ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
+ final static List<AdaptorAveragedRate> SYS_ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>();
public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_MINUTE = new AdaptorAveragedRate();
public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_FIVE_MINUTE = new AdaptorAveragedRate();
public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_TEN_MINUTE = new AdaptorAveragedRate();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
deleted file mode 100644
index 2a17417..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.datacollection.collector;
-
-
-import org.mortbay.jetty.*;
-import org.mortbay.jetty.nio.SelectChannelConnector;
-import org.mortbay.jetty.servlet.*;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
-import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
-import org.apache.hadoop.chukwa.datacollection.writer.*;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import javax.servlet.http.HttpServlet;
-import java.io.File;
-import java.util.*;
-
-@Deprecated
-public class CollectorStub {
-
- static int THREADS = 120;
- public static Server jettyServer = null;
-
- public static void main(String[] args) {
-
- try {
- if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) {
- System.out.println("usage: Normally you should just invoke CollectorStub without arguments.");
- System.out.println("A number of options can be specified here for debugging or special uses. e.g.: ");
- System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>"
- + "\n\tservlet=<classname>@path");
- System.out.println("Command line options will override normal configuration.");
- System.exit(0);
- }
-
- ChukwaConfiguration conf = new ChukwaConfiguration();
-
- try {
- Configuration collectorConf = new Configuration(false);
- collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml"));
- collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml"));
- } catch(Exception e) {e.printStackTrace();}
-
- int portNum = conf.getInt("chukwaCollector.http.port", 9999);
- THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
-
- // pick a writer.
- ChukwaWriter w = null;
- Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
- ServletCollector servletCollector = new ServletCollector(conf);
- for(String arg: args) {
- if(arg.startsWith("writer=")) { //custom writer class
- String writerCmd = arg.substring("writer=".length());
- if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) {
- boolean verbose = !writerCmd.equals("pretend-quietly");
- w = new ConsoleWriter(verbose);
- w.init(conf);
- servletCollector.setWriter(w);
- } else
- conf.set("chukwaCollector.writerClass", writerCmd);
- } else if(arg.startsWith("servlet=")) { //adding custom servlet
- String servletCmd = arg.substring("servlet=".length());
- String[] halves = servletCmd.split("@");
- try {
- Class<?> servletClass = Class.forName(halves[0]);
- HttpServlet srvlet = (HttpServlet) servletClass.newInstance();
- if(!halves[1].startsWith("/"))
- halves[1] = "/" + halves[1];
- servletsToAdd.put(halves[1], srvlet);
- } catch(Exception e) {
- e.printStackTrace();
- }
- } else if(arg.startsWith("portno=")) {
- portNum = Integer.parseInt(arg.substring("portno=".length()));
- } else { //unknown arg
- System.out.println("WARNING: unknown command line arg " + arg);
- System.out.println("Invoke collector with command line arg 'help' for usage");
- }
- }
-
- // Set up jetty connector
- SelectChannelConnector jettyConnector = new SelectChannelConnector();
- jettyConnector.setLowResourcesConnections(THREADS - 10);
- jettyConnector.setLowResourceMaxIdleTime(1500);
- jettyConnector.setPort(portNum);
-
- // Set up jetty server proper, using connector
- jettyServer = new Server(portNum);
- jettyServer.setConnectors(new Connector[] { jettyConnector });
- org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
- pool.setMaxThreads(THREADS);
- jettyServer.setThreadPool(pool);
-
- // Add the collector servlet to server
- Context root = new Context(jettyServer, "/", Context.SESSIONS);
- root.addServlet(new ServletHolder(servletCollector), "/*");
-
- if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
- root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
-
- if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
- root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
-
-
- root.setAllowNullPathInfo(false);
-
- // Add in any user-specified servlets
- for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) {
- root.addServlet(new ServletHolder(e.getValue()), e.getKey());
- }
-
- // And finally, fire up the server
- jettyServer.start();
- jettyServer.setStopAtShutdown(true);
-
- System.out.println("started Chukwa http collector on port " + portNum);
- System.out.close();
- System.err.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
-}
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
index c3e72d2..d4c2df4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
@@ -38,7 +38,7 @@
private static final long serialVersionUID = -4627538252371890849L;
- protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
+ protected final static Logger log = Logger.getLogger(CommitCheckServlet.class);
CommitCheckThread commitCheck;
Configuration conf;
//interval at which to scan the filesystem, ms
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
index b1ead05..613fa3e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
@@ -67,7 +67,7 @@
long totalStoredSize = 0;
private static final long serialVersionUID = -4602082382919009285L;
- protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
+ protected final static Logger log = Logger.getLogger(LogDisplayServlet.class);
public LogDisplayServlet() {
conf = new Configuration();
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
index 03a88df..69e3566 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
@@ -27,6 +27,7 @@
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -35,6 +36,7 @@
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.log4j.Logger;
@@ -128,14 +130,14 @@
e.printStackTrace();
}
PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
- .getOutputStream()));
+ .getOutputStream(), Charset.forName("UTF-8")));
if(id != null)
bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
else
bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(s
- .getInputStream()));
+ .getInputStream(), Charset.forName("UTF-8")));
String resp = br.readLine();
if (resp != null) {
String[] fields = resp.split(" ");
@@ -153,15 +155,14 @@
s.setSoTimeout(60000);
} catch (SocketException e) {
log.warn("Error while settin soTimeout to 60000");
- e.printStackTrace();
}
PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
- .getOutputStream()));
+ .getOutputStream(), Charset.forName("UTF-8")));
bw.println("SHUTDOWN " + id);
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(s
- .getInputStream()));
+ .getInputStream(), Charset.forName("UTF-8")));
String resp = br.readLine();
if (resp == null || !resp.startsWith("OK")) {
log.error("adaptor unregister error, id: " + id);
@@ -348,12 +349,12 @@
e.printStackTrace();
}
PrintWriter bw = new PrintWriter(
- new OutputStreamWriter(s.getOutputStream()));
+ new OutputStreamWriter(s.getOutputStream(), Charset.forName("UTF-8")));
bw.println("LIST");
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(s
- .getInputStream()));
+ .getInputStream(), Charset.forName("UTF-8")));
String ln;
Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
while ((ln = br.readLine()) != null) {
@@ -370,11 +371,13 @@
// -
// paren
long offset = Long.parseLong(parts[parts.length - 1]);
- String tmpParams = parts[3];
+ StringBuilder tmpParams = new StringBuilder();
+ tmpParams.append(parts[3]);
for (int i = 4; i < parts.length - 1; i++) {
- tmpParams += " " + parts[i];
+ tmpParams.append(" ");
+ tmpParams.append(parts[i]);
}
- listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams,
+ listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams.toString(),
offset));
}
}
@@ -563,8 +566,7 @@
if (adaptorID != null) {
log.info("Successfully added adaptor, id is:" + adaptorID);
} else {
- System.err.println("Agent reported failure to add adaptor, adaptor id returned was:"
- + adaptorID);
+ log.error("Agent reported failure to add adaptor.");
}
return adaptorID;
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
index 11e9305..6f818e4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
@@ -44,7 +44,7 @@
*/
public class AsyncAckSender extends ChukwaHttpSender{
- protected static Logger log = Logger.getLogger(AsyncAckSender.class);
+ protected final static Logger log = Logger.getLogger(AsyncAckSender.class);
/*
* Represents the state required for an asynchronous ack.
*
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
index 22460d7..1c8c3d2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
@@ -26,6 +26,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -354,7 +355,7 @@
}
});
- pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
+ pars.setParameter(HttpMethodParams.SO_TIMEOUT, Integer.valueOf(COLLECTOR_TIMEOUT));
method.setParams(pars);
method.setPath(dest);
@@ -385,7 +386,7 @@
// Get the response body
byte[] resp_buf = method.getResponseBody();
rstream = new ByteArrayInputStream(resp_buf);
- BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
+ BufferedReader br = new BufferedReader(new InputStreamReader(rstream, Charset.forName("UTF-8")));
String line;
List<String> resp = new ArrayList<String>();
while ((line = br.readLine()) != null) {
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
index 30442e2..dea2d07 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.chukwa.datacollection.writer.hbase;
-import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -36,6 +36,7 @@
public class Reporter {
private ArrayList<Put> meta = new ArrayList<Put>();
private MessageDigest md5 = null;
+ private final static Charset UTF8 = Charset.forName("UTF-8");
public Reporter() throws NoSuchAlgorithmException {
md5 = MessageDigest.getInstance("md5");
@@ -48,11 +49,11 @@
try {
Type metaType = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> meta = new HashMap<String, String>();
- meta.put("sig", new String(value, "UTF-8"));
+ meta.put("sig", new String(value, UTF8));
meta.put("type", "source");
Gson gson = new Gson();
buffer = gson.toJson(meta, metaType);
- put(type.getBytes(), source.getBytes(), buffer.toString().getBytes());
+ put(type.getBytes(UTF8), source.getBytes(UTF8), buffer.toString().getBytes(UTF8));
} catch (Exception e) {
Log.warn("Error encoding metadata.");
Log.warn(e);
@@ -70,7 +71,7 @@
meta.put("type", "metric");
Gson gson = new Gson();
buffer = gson.toJson(meta, metaType);
- put(type.getBytes(), metric.getBytes(), buffer.toString().getBytes());
+ put(type.getBytes(UTF8), metric.getBytes(UTF8), buffer.toString().getBytes(UTF8));
} catch (Exception e) {
Log.warn("Error encoding metadata.");
Log.warn(e);
@@ -78,12 +79,12 @@
}
public void put(String key, String source, String info) {
- put(key.getBytes(), source.getBytes(), info.getBytes());
+ put(key.getBytes(UTF8), source.getBytes(UTF8), info.getBytes(UTF8));
}
public void put(byte[] key, byte[] source, byte[] info) {
Put put = new Put(key);
- put.addColumn("k".getBytes(), source, info);
+ put.addColumn("k".getBytes(UTF8), source, info);
meta.add(put);
}
@@ -97,25 +98,20 @@
private byte[] getHash(String key) {
byte[] hash = new byte[5];
- System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5);
+ System.arraycopy(md5.digest(key.getBytes(UTF8)), 0, hash, 0, 5);
return hash;
}
public void putClusterName(String type, String clusterName) {
byte[] value = getHash(clusterName);
String buffer;
- try {
- Type metaType = new TypeToken<Map<String, String>>(){}.getType();
- Map<String, String> meta = new HashMap<String, String>();
- meta.put("sig", new String(value, "UTF-8"));
- meta.put("type", "cluster");
- Gson gson = new Gson();
- buffer = gson.toJson(meta, metaType);
- put(type.getBytes(), clusterName.getBytes(), buffer.toString().getBytes());
- } catch (UnsupportedEncodingException e) {
- Log.warn("Error encoding metadata.");
- Log.warn(e);
- }
+ Type metaType = new TypeToken<Map<String, String>>(){}.getType();
+ Map<String, String> meta = new HashMap<String, String>();
+ meta.put("sig", new String(value, UTF8));
+ meta.put("type", "cluster");
+ Gson gson = new Gson();
+ buffer = gson.toJson(meta, metaType);
+ put(type.getBytes(UTF8), clusterName.getBytes(UTF8), buffer.toString().getBytes(UTF8));
}
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
index f67fe87..bf64b24 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.chukwa.datacollection.writer.solr;
+import java.io.IOException;
+import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -34,12 +36,13 @@
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;
public class SolrWriter extends PipelineableWriter {
private static Logger log = Logger.getLogger(SolrWriter.class);
- private static CloudSolrServer server;
+ private CloudSolrServer server;
private final static String ID = "id";
private final static String SEQ_ID = "seqId";
private final static String DATA_TYPE = "type";
@@ -64,8 +67,10 @@
throw new WriterException("Solr server address is not defined.");
}
String collection = c.get("solr.collection", "logs");
- server = new CloudSolrServer(serverName);
- server.setDefaultCollection(collection);
+ if(server == null) {
+ server = new CloudSolrServer(serverName);
+ server.setDefaultCollection(collection);
+ }
}
@Override
@@ -84,10 +89,10 @@
doc.addField(SOURCE, chunk.getSource());
doc.addField(SEQ_ID, chunk.getSeqID());
doc.addField(DATA_TYPE, chunk.getDataType());
- doc.addField(DATA, new String(chunk.getData()));
+ doc.addField(DATA, new String(chunk.getData(), Charset.forName("UTF-8")));
// TODO: improve parsing logic for more sophisticated tagging
- String data = new String(chunk.getData());
+ String data = new String(chunk.getData(), Charset.forName("UTF-8"));
Matcher m = userPattern.matcher(data);
if(m.find()) {
doc.addField(USER, m.group(1));
@@ -109,7 +114,7 @@
}
server.add(doc);
server.commit();
- } catch (Exception e) {
+ } catch (SolrServerException | IOException e) {
log.warn("Failed to store data to Solr Cloud.");
log.warn(ExceptionUtil.getStackTrace(e));
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 8075f4d..f828ff1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -22,6 +22,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -71,19 +72,20 @@
static double RESOLUTION = 360;
static int MINUTE = 60000; //60 milliseconds
final static int SECOND = (int) TimeUnit.SECONDS.toMillis(1);
+ private final static Charset UTF8 = Charset.forName("UTF-8");
- static byte[] COLUMN_FAMILY = "t".getBytes();
- static byte[] ANNOTATION_FAMILY = "a".getBytes();
- static byte[] KEY_NAMES = "k".getBytes();
- static byte[] CHART_TYPE = "chart_meta".getBytes();
- static byte[] CHART_FAMILY = "c".getBytes();
- static byte[] COMMON_FAMILY = "c".getBytes();
- static byte[] WIDGET_TYPE = "widget_meta".getBytes();
- static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes();
+ final static byte[] COLUMN_FAMILY = "t".getBytes(UTF8);
+ final static byte[] ANNOTATION_FAMILY = "a".getBytes(UTF8);
+ final static byte[] KEY_NAMES = "k".getBytes(UTF8);
+ final static byte[] CHART_TYPE = "chart_meta".getBytes(UTF8);
+ final static byte[] CHART_FAMILY = "c".getBytes(UTF8);
+ final static byte[] COMMON_FAMILY = "c".getBytes(UTF8);
+ final static byte[] WIDGET_TYPE = "widget_meta".getBytes(UTF8);
+ final static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes(UTF8);
private static final String CHUKWA = "chukwa";
private static final String CHUKWA_META = "chukwa_meta";
private static long MILLISECONDS_IN_DAY = 86400000L;
- protected static Connection connection = null;
+ private static Connection connection = null;
public ChukwaHBaseStore() {
super();
@@ -171,7 +173,7 @@
byte[] key = CellUtil.cloneQualifier(kv);
long timestamp = ByteBuffer.wrap(key).getLong();
double value = Double
- .parseDouble(new String(CellUtil.cloneValue(kv), "UTF-8"));
+ .parseDouble(new String(CellUtil.cloneValue(kv), UTF8));
series.add(timestamp, value);
}
}
@@ -179,7 +181,7 @@
currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
}
table.close();
- } catch (Exception e) {
+ } catch (IOException e) {
closeHBase();
LOG.error(ExceptionUtil.getStackTrace(e));
}
@@ -191,12 +193,12 @@
try {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
- Get get = new Get(metricGroup.getBytes());
+ Get get = new Get(metricGroup.getBytes(UTF8));
Result result = table.get(get);
for (Cell kv : result.rawCells()) {
- JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), "UTF-8"));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), UTF8));
if (json.get("type").equals("metric")) {
- familyNames.add(new String(CellUtil.cloneQualifier(kv), "UTF-8"));
+ familyNames.add(new String(CellUtil.cloneQualifier(kv), UTF8));
}
}
table.close();
@@ -219,7 +221,7 @@
Iterator<Result> it = rs.iterator();
while (it.hasNext()) {
Result result = it.next();
- metricGroups.add(new String(result.getRow(), "UTF-8"));
+ metricGroups.add(new String(result.getRow(), UTF8));
}
table.close();
} catch (Exception e) {
@@ -241,9 +243,9 @@
while (it.hasNext()) {
Result result = it.next();
for (Cell cell : result.rawCells()) {
- JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
if (json!=null && json.get("type")!=null && json.get("type").equals("source")) {
- pk.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+ pk.add(new String(CellUtil.cloneQualifier(cell), UTF8));
}
}
}
@@ -296,7 +298,7 @@
for(Cell cell : result.rawCells()) {
byte[] dest = new byte[5];
System.arraycopy(CellUtil.cloneRow(cell), 3, dest, 0, 5);
- String source = new String(dest);
+ String source = new String(dest, UTF8);
long time = cell.getTimestamp();
// Time display in x axis
long delta = time - startTime;
@@ -306,11 +308,11 @@
if (keyMap.containsKey(source)) {
y = keyMap.get(source);
} else {
- keyMap.put(source, new Integer(index));
+ keyMap.put(source, Integer.valueOf(index));
y = index;
index++;
}
- double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
+ double v = Double.parseDouble(new String(CellUtil.cloneValue(cell), UTF8));
heatmap.put(x, y, v);
if (v > max) {
max = v;
@@ -355,9 +357,9 @@
while (it.hasNext()) {
Result result = it.next();
for (Cell cell : result.rawCells()) {
- JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
if (json.get("type").equals("cluster")) {
- clusters.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+ clusters.add(new String(CellUtil.cloneQualifier(cell), UTF8));
}
}
}
@@ -382,10 +384,10 @@
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get get = new Get(CHART_TYPE);
Result r = table.get(get);
- byte[] value = r.getValue(CHART_FAMILY, id.getBytes());
+ byte[] value = r.getValue(CHART_FAMILY, id.getBytes(UTF8));
Gson gson = new Gson();
if(value!=null) {
- chart = gson.fromJson(new String(value), Chart.class);
+ chart = gson.fromJson(new String(value, UTF8), Chart.class);
}
table.close();
} catch (Exception e) {
@@ -408,7 +410,7 @@
Put put = new Put(CHART_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(chart);
- put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+ put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
} catch (Exception e) {
@@ -437,7 +439,7 @@
s.setLineOptions(l);
series.add(s);
}
- chart.SetSeries(series);
+ chart.setSeries(series);
return createChart(chart);
}
@@ -469,7 +471,7 @@
Put put = new Put(CHART_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(chart);
- put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+ put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
} catch (Exception e) {
@@ -499,8 +501,8 @@
}
// Figure out the time range and determine the best resolution
// to fetch the data
- long range = Math.round((endTime - startTime)
- / (MINUTES_IN_HOUR * MINUTE));
+ long range = (endTime - startTime)
+ / (long) (MINUTES_IN_HOUR * MINUTE);
long sampleRate = 1;
if (range <= 1) {
sampleRate = 5;
@@ -512,7 +514,7 @@
sampleRate = 87600;
}
double smoothing = (endTime - startTime)
- / (sampleRate * SECOND ) / RESOLUTION;
+ / (double) (sampleRate * SECOND ) / (double) RESOLUTION;
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA));
@@ -550,7 +552,7 @@
byte[] key = CellUtil.cloneQualifier(kv);
long timestamp = ByteBuffer.wrap(key).getLong();
double value = Double.parseDouble(new String(CellUtil.cloneValue(kv),
- "UTF-8"));
+ UTF8));
if(initial==0) {
filteredValue = value;
}
@@ -558,7 +560,7 @@
lastTime = timestamp;
// Determine if there is any gap, if there is gap in data, reset
// calculation.
- if (elapsedTime > sampleRate) {
+ if (elapsedTime > (sampleRate * 5)) {
filteredValue = 0.0d;
} else {
if (smoothing != 0.0d) {
@@ -587,7 +589,7 @@
list.add(clone);
}
table.close();
- } catch (Exception e) {
+ } catch (IOException|CloneNotSupportedException e) {
closeHBase();
LOG.error(ExceptionUtil.getStackTrace(e));
}
@@ -622,7 +624,7 @@
continue;
}
Gson gson = new Gson();
- Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+ Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
list.add(widget);
c++;
}
@@ -658,7 +660,7 @@
Result result = it.next();
for(Cell kv : result.rawCells()) {
Gson gson = new Gson();
- Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+ Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
list.add(widget);
}
}
@@ -683,11 +685,11 @@
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get widget = new Get(WIDGET_TYPE);
- widget.addColumn(COMMON_FAMILY, title.getBytes());
+ widget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
Result rs = table.get(widget);
- byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes());
+ byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes(UTF8));
Gson gson = new Gson();
- w = gson.fromJson(new String(buffer), Widget.class);
+ w = gson.fromJson(new String(buffer, UTF8), Widget.class);
table.close();
} catch (Exception e) {
closeHBase();
@@ -708,7 +710,7 @@
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get widgetTest = new Get(WIDGET_TYPE);
- widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes());
+ widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8));
if (table.exists(widgetTest)) {
LOG.warn("Widget: " + widget.getTitle() + " already exists.");
created = false;
@@ -716,7 +718,7 @@
Put put = new Put(WIDGET_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(widget);
- put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes());
+ put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
created = true;
}
@@ -741,12 +743,12 @@
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Delete oldWidget = new Delete(WIDGET_TYPE);
- oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+ oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
table.delete(oldWidget);
Put put = new Put(WIDGET_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(widget);
- put.addColumn(COMMON_FAMILY, title.getBytes(), buffer.getBytes());
+ put.addColumn(COMMON_FAMILY, title.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
result = true;
@@ -772,7 +774,7 @@
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Delete oldWidget = new Delete(WIDGET_TYPE);
- oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+ oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
table.delete(oldWidget);
table.close();
result = true;
@@ -790,7 +792,7 @@
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get dashboardTest = new Get(DASHBOARD_TYPE);
- dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes());
+ dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes(UTF8));
exists = table.exists(dashboardTest);
table.close();
} catch (Exception e) {
@@ -931,19 +933,19 @@
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get dashboard = new Get(DASHBOARD_TYPE);
- dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+ dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
Result rs = table.get(dashboard);
- byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes());
+ byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));
if(buffer == null) {
// If user dashboard is not found, use default dashboard.
key = new StringBuilder().append(id).append("|").toString();
dashboard = new Get(DASHBOARD_TYPE);
- dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+ dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
rs = table.get(dashboard);
- buffer = rs.getValue(COMMON_FAMILY, key.getBytes());
+ buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));
}
Gson gson = new Gson();
- dash = gson.fromJson(new String(buffer), Dashboard.class);
+ dash = gson.fromJson(new String(buffer, UTF8), Dashboard.class);
table.close();
} catch (Exception e) {
closeHBase();
@@ -964,7 +966,7 @@
Put put = new Put(DASHBOARD_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(dash);
- put.addColumn(COMMON_FAMILY, key.getBytes(), buffer.getBytes());
+ put.addColumn(COMMON_FAMILY, key.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
result = true;
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
index eb79cd7..4f5f289 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.chukwa.extraction.hbase;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -37,7 +38,7 @@
protected String sourceHelper;
protected byte[] key = null;
- byte[] CF = "t".getBytes();
+ byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
boolean chunkInErrorSaved = false;
ArrayList<Put> output = null;
@@ -70,14 +71,14 @@
byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
Put put = new Put(key);
byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add(CF, timeInBytes, time, value);
+ put.addColumn(CF, timeInBytes, time, value);
output.add(put);
reporter.putMetric(chunk.getDataType(), primaryKey);
reporter.putSource(chunk.getDataType(), source);
}
public void addRecord(String primaryKey, String value) {
- addRecord(primaryKey, value.getBytes());
+ addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
}
/**
@@ -96,7 +97,7 @@
byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
Put put = new Put(key);
byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add(CF, timeInBytes, time, value);
+ put.addColumn(CF, timeInBytes, time, value);
output.add(put);
reporter.putMetric(chunk.getDataType(), primaryKey);
}
@@ -126,7 +127,7 @@
Put put = new Put(key);
String family = "a";
byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes());
+ put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
output.add(put);
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
index 2da64a3..483ac71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
@@ -17,34 +17,46 @@
*/
package org.apache.hadoop.chukwa.extraction.hbase;
+import java.lang.reflect.Type;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.chukwa.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
public class DefaultProcessor extends AbstractProcessor {
-
+
public DefaultProcessor() throws NoSuchAlgorithmException {
- super();
- // TODO Auto-generated constructor stub
+ super();
+ // TODO Auto-generated constructor stub
}
-static Logger LOG = Logger.getLogger(DefaultProcessor.class);
+ static Logger LOG = Logger.getLogger(DefaultProcessor.class);
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource());
- Put put = new Put(key);
- byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add("t".getBytes(), timeInBytes, chunk.getData());
- output.add(put);
- JSONObject json = new JSONObject();
- json.put("sig", key);
- json.put("type", "unknown");
- reporter.put(chunk.getDataType(), chunk.getSource(), json.toString());
+ byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(),
+ chunk.getSource());
+ Put put = new Put(key);
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+ put.addColumn("t".getBytes(Charset.forName("UTF-8")), timeInBytes,
+ chunk.getData());
+ output.add(put);
+ Type defaultType = new TypeToken<Map<String, String>>() {
+ }.getType();
+ Gson gson = new Gson();
+ Map<String, String> meta = new HashMap<String, String>();
+ meta.put("sig", new String(key, Charset.forName("UTF-8")));
+ meta.put("type", "unknown");
+ String buffer = gson.toJson(meta, defaultType);
+ reporter.put(chunk.getDataType(), chunk.getSource(), buffer);
}
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
index 19df607..de64a0d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.extraction.hbase;
-
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
-import org.apache.hadoop.chukwa.util.HBaseUtil;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -37,15 +37,14 @@
static final String recordNameField = "recordName";
static final String hostName = "Hostname";
static final String processName = "ProcessName";
- static final byte[] cf = "t".getBytes();
+ static final byte[] cf = "t".getBytes(Charset.forName("UTF-8"));
public HadoopMetricsProcessor() throws NoSuchAlgorithmException {
}
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- try {
- String body = new String(recordEntry);
+ String body = new String(recordEntry, Charset.forName("UTF-8"));
int start = body.indexOf('{');
JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
@@ -56,10 +55,8 @@
if(json.get(processName)!=null) {
src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
}
- @SuppressWarnings("unchecked")
- Iterator<String> ki = json.keySet().iterator();
- while (ki.hasNext()) {
- String keyName = ki.next();
+ for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+ String keyName = entry.getKey();
if (timestampField.intern() == keyName.intern()) {
continue;
} else if (contextNameField.intern() == keyName.intern()) {
@@ -71,20 +68,14 @@
} else if (processName.intern() == keyName.intern()) {
continue;
} else {
- if (json.get(keyName) != null) {
- String v = json.get(keyName).toString();
+ if(json.get(keyName)!=null) {
+ String v = entry.getValue().toString();
String primaryKey = new StringBuilder(contextName).append(".")
.append(recordName).append(".").append(keyName).toString();
- addRecord(time, primaryKey, src, v.getBytes(), output);
+ addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
}
}
}
-
- } catch (Exception e) {
- LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
- e);
- throw e;
- }
}
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
index dcbe2d4..0682c71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
@@ -22,43 +22,43 @@
import java.util.Date;
public class LogEntry {
- private final static SimpleDateFormat sdf = new SimpleDateFormat(
- "yyyy-MM-dd HH:mm");
+ private SimpleDateFormat sdf = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm");
- private Date date;
- private String logLevel;
- private String className;
- private String body;
+ private Date date;
+ private String logLevel;
+ private String className;
+ private String body;
- public LogEntry(String recordEntry) throws ParseException {
- String dStr = recordEntry.substring(0, 23);
- date = sdf.parse(dStr);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- logLevel = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- className = recordEntry.substring(start, idx - 1);
- body = recordEntry.substring(idx + 1);
- }
+ public LogEntry(String recordEntry) throws ParseException {
+ String dStr = recordEntry.substring(0, 23);
+ date = sdf.parse(dStr);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ logLevel = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ className = recordEntry.substring(start, idx - 1);
+ body = recordEntry.substring(idx + 1);
+ }
- public Date getDate() {
- return date;
- }
+ public Date getDate() {
+ return (Date) date.clone();
+ }
- public void setDate(Date date) {
- this.date = date;
- }
+ public void setDate(Date date) {
+ this.date = (Date) date.clone();
+ }
- public String getLogLevel() {
- return logLevel;
- }
+ public String getLogLevel() {
+ return logLevel;
+ }
- public String getClassName() {
- return className;
- }
+ public String getClassName() {
+ return className;
+ }
- public String getBody() {
- return body;
- }
+ public String getBody() {
+ return body;
+ }
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
index c2695f2..3718fbd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -22,20 +22,14 @@
*/
package org.apache.hadoop.chukwa.extraction.hbase;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Iterator;
-import java.util.TimeZone;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
-import org.apache.hadoop.chukwa.Chunk;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -48,7 +42,7 @@
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- String buffer = new String(recordEntry);
+ String buffer = new String(recordEntry, Charset.forName("UTF-8"));
JSONObject json = (JSONObject) JSONValue.parse(buffer);
time = ((Long) json.get("timestamp")).longValue();
ChukwaRecord record = new ChukwaRecord();
@@ -70,11 +64,9 @@
user = user + Double.parseDouble(cpu.get("user").toString());
sys = sys + Double.parseDouble(cpu.get("sys").toString());
idle = idle + Double.parseDouble(cpu.get("idle").toString());
- @SuppressWarnings("unchecked")
- Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator();
- while(iterator.hasNext()) {
- String key = iterator.next();
- addRecord("cpu." + key + "." + i, cpu.get(key).toString());
+ for(Entry<String, Object> entry : (Set<Map.Entry>) cpu.entrySet()) {
+ String key = entry.getKey();
+ addRecord("cpu." + key + "." + i, String.valueOf(entry.getValue()));
}
}
combined = combined / actualSize;
@@ -94,20 +86,15 @@
record = new ChukwaRecord();
JSONObject memory = (JSONObject) json.get("memory");
- @SuppressWarnings("unchecked")
- Iterator<String> memKeys = memory.keySet().iterator();
- while (memKeys.hasNext()) {
- String key = memKeys.next();
- addRecord("memory." + key, memory.get(key).toString());
+ for(Entry<String, Object> entry : (Set<Map.Entry>) memory.entrySet()) {
+ String key = entry.getKey();
+ addRecord("memory." + key, String.valueOf(entry.getValue()));
}
record = new ChukwaRecord();
JSONObject swap = (JSONObject) json.get("swap");
- @SuppressWarnings("unchecked")
- Iterator<String> swapKeys = swap.keySet().iterator();
- while (swapKeys.hasNext()) {
- String key = swapKeys.next();
- addRecord("swap." + key, swap.get(key).toString());
+ for(Map.Entry<String, Object> entry : (Set<Map.Entry>) swap.entrySet()) {
+ addRecord("swap." + entry.getKey(), String.valueOf(entry.getValue()));
}
double rxBytes = 0;
@@ -122,28 +109,30 @@
JSONArray netList = (JSONArray) json.get("network");
for (int i = 0; i < netList.size(); i++) {
JSONObject netIf = (JSONObject) netList.get(i);
- @SuppressWarnings("unchecked")
- Iterator<String> keys = netIf.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, netIf.get(key).toString());
+ for(Map.Entry<String, Object> entry : (Set<Map.Entry>) netIf.entrySet()) {
+ String key = entry.getKey();
+ long value = 0;
+ if(entry.getValue() instanceof Long) {
+ value = (Long) entry.getValue();
+ }
+ record.add(key + "." + i, String.valueOf(entry.getValue()));
if (i != 0) {
if (key.equals("RxBytes")) {
- rxBytes = rxBytes + (Long) netIf.get(key);
+ rxBytes = rxBytes + value;
} else if (key.equals("RxDropped")) {
- rxDropped = rxDropped + (Long) netIf.get(key);
+ rxDropped = rxDropped + value;
} else if (key.equals("RxErrors")) {
- rxErrors = rxErrors + (Long) netIf.get(key);
+ rxErrors = rxErrors + value;
} else if (key.equals("RxPackets")) {
- rxPackets = rxPackets + (Long) netIf.get(key);
+ rxPackets = rxPackets + value;
} else if (key.equals("TxBytes")) {
- txBytes = txBytes + (Long) netIf.get(key);
+ txBytes = txBytes + value;
} else if (key.equals("TxCollisions")) {
- txCollisions = txCollisions + (Long) netIf.get(key);
+ txCollisions = txCollisions + value;
} else if (key.equals("TxErrors")) {
- txErrors = txErrors + (Long) netIf.get(key);
+ txErrors = txErrors + value;
} else if (key.equals("TxPackets")) {
- txPackets = txPackets + (Long) netIf.get(key);
+ txPackets = txPackets + value;
}
}
}
@@ -168,22 +157,25 @@
JSONArray diskList = (JSONArray) json.get("disk");
for (int i = 0; i < diskList.size(); i++) {
JSONObject disk = (JSONObject) diskList.get(i);
- Iterator<String> keys = disk.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, disk.get(key).toString());
+ for(Entry<String, Object> entry : (Set<Map.Entry>) disk.entrySet()) {
+ String key = entry.getKey();
+ long value = 0;
+ if(entry.getValue() instanceof Long) {
+ value = (Long) entry.getValue();
+ }
+ record.add(key + "." + i, String.valueOf(entry.getValue()));
if (key.equals("ReadBytes")) {
- readBytes = readBytes + (Long) disk.get("ReadBytes");
+ readBytes = readBytes + value;
} else if (key.equals("Reads")) {
- reads = reads + (Long) disk.get("Reads");
+ reads = reads + Long.valueOf(value);;
} else if (key.equals("WriteBytes")) {
- writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+ writeBytes = writeBytes + value;
} else if (key.equals("Writes")) {
- writes = writes + (Long) disk.get("Writes");
+ writes = writes + value;
} else if (key.equals("Total")) {
- total = total + (Long) disk.get("Total");
+ total = total + value;
} else if (key.equals("Used")) {
- used = used + (Long) disk.get("Used");
+ used = used + value;
}
}
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
index 9c21bf1..02fe3b7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
@@ -20,21 +20,20 @@
import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore;
public class ClusterConfig {
- private static Set<String> clusterMap = null;
+ private Set<String> clusterMap = null;
static public String getContents(File aFile) {
// ...checks on aFile are elided
StringBuffer contents = new StringBuffer();
try {
- // use buffering, reading one line at a time
- // FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
+ BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
try {
String line = null; // not declared within while loop
/*
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
index 2d84c09..fe90941 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
@@ -24,6 +24,7 @@
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -124,12 +125,12 @@
StringBuilder sb = new StringBuilder();
String line = null;
try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
FSDataOutputStream out = fs.create(dest);
- out.write(sb.toString().getBytes());
+ out.write(sb.toString().getBytes(Charset.forName("UTF-8")));
out.close();
reader.close();
} catch(IOException e) {
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
index 7c8c6a7..19cde5f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
@@ -111,7 +111,7 @@
return this.id;
}
- public void SetSeries(List<SeriesMetaData> series) {
+ public void setSeries(List<SeriesMetaData> series) {
this.series = series;
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
index 61dc0b5..74d5e89 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
@@ -89,11 +89,11 @@
}
public String[] getTokens() {
- return tokens;
+ return tokens.clone();
}
public void setTokens(String[] tokens) {
- this.tokens = tokens;
+ this.tokens = tokens.clone();
}
public void tokenize() {
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
index c9413bd..869efa4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
@@ -24,6 +24,7 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
+import java.nio.charset.Charset;
import java.util.Map;
import javax.servlet.ServletException;
@@ -44,11 +45,11 @@
private final String USER_AGENT = "Mozilla/5.0";
private final static String SOLR_URL = "chukwa.solr.url";
private final static Logger LOG = Logger.getLogger(HttpProxy.class);
- private ChukwaConfiguration conf = new ChukwaConfiguration();
private String solrUrl = null;
public HttpProxy() {
super();
+ ChukwaConfiguration conf = new ChukwaConfiguration();
solrUrl = conf.get(SOLR_URL);
}
@@ -72,7 +73,7 @@
LOG.info("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(
- con.getInputStream()));
+ con.getInputStream(), Charset.forName("UTF-8")));
String inputLine;
StringBuffer response1 = new StringBuffer();
@@ -80,7 +81,7 @@
while ((inputLine = in.readLine()) != null) {
response1.append(inputLine);
- sout.write(inputLine.getBytes());
+ sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
}
in.close();
@@ -131,7 +132,7 @@
LOG.debug("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(
- con.getInputStream()));
+ con.getInputStream(), Charset.forName("UTF-8")));
String inputLine;
StringBuffer response1 = new StringBuffer();
@@ -139,7 +140,7 @@
while ((inputLine = in.readLine()) != null) {
response1.append(inputLine);
- sout.write(inputLine.getBytes());
+ sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
}
in.close();
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
index ceed0df..1441253 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
@@ -20,6 +20,8 @@
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
@@ -63,8 +65,8 @@
Gson gson = new Gson();
Type stringStringMap = new TypeToken<Map<String, String>>(){}.getType();
Map<String,String> map = gson.fromJson(buffer, stringStringMap);
- for(String key : map.keySet()) {
- request.getSession().setAttribute(key, map.get(key));
+ for(Entry<String, String> entry : (Set<Map.Entry<String, String>>) map.entrySet()) {
+ request.getSession().setAttribute(entry.getKey(), entry.getValue());
}
return Response.ok().build();
}
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
index ea07797..4524922 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
@@ -39,7 +39,7 @@
private VelocityEngine ve;
private static Logger LOG = Logger.getLogger(VelocityResolver.class);
- public static String LOGGER_NAME = VelocityResolver.class.getName();
+ public final static String LOGGER_NAME = VelocityResolver.class.getName();
/**
* Jersey configuration for setting up Velocity configuration.
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
deleted file mode 100644
index f0a3303..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.sql.SQLException;
-import java.util.Calendar;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Timer;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueInfoProcessor {
-
- private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
-
- private int intervalValue = 60;
- private String torqueServer = null;
- private String torqueBinDir = null;
- private String domain = null;
-
- private TreeMap<String, TreeMap<String, String>> currentHodJobs;
-
- public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
- this.intervalValue = interval;
-
- torqueServer = System.getProperty("TORQUE_SERVER");
- torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
- domain = System.getProperty("DOMAIN");
- currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
- }
-
- public void setup(boolean recover) throws Exception {
- }
-
- private void getHodJobInfo() throws IOException {
- StringBuffer sb = new StringBuffer();
- sb.append(torqueBinDir).append("/qstat -a");
-
- String[] getQueueInfoCommand = new String[3];
- getQueueInfoCommand[0] = "ssh";
- getQueueInfoCommand[1] = torqueServer;
- getQueueInfoCommand[2] = sb.toString();
-
- String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
- + " " + getQueueInfoCommand[2];
- ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
-
- Process p = pb.start();
-
- Timer timeout = new Timer();
- TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
- BufferedReader result = new BufferedReader(new InputStreamReader(p
- .getInputStream()));
- ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
- command, true);
- errorHandler.start();
-
- String line = null;
- boolean start = false;
- TreeSet<String> jobsInTorque = new TreeSet<String>();
- while ((line = result.readLine()) != null) {
- if (line.startsWith("---")) {
- start = true;
- continue;
- }
-
- if (start) {
- String[] items = line.split("\\s+");
- if (items.length >= 10) {
- String hodIdLong = items[0];
- String hodId = hodIdLong.split("[.]")[0];
- String userId = items[1];
- String numOfMachine = items[5];
- String status = items[9];
- jobsInTorque.add(hodId);
- if (!currentHodJobs.containsKey(hodId)) {
- TreeMap<String, String> aJobData = new TreeMap<String, String>();
-
- aJobData.put("userId", userId);
- aJobData.put("numOfMachine", numOfMachine);
- aJobData.put("traceCheckCount", "0");
- aJobData.put("process", "0");
- aJobData.put("status", status);
- currentHodJobs.put(hodId, aJobData);
- } else {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- aJobData.put("status", status);
- currentHodJobs.put(hodId, aJobData);
- }// if..else
- }
- }
- }// while
-
- try {
- errorHandler.join();
- } catch (InterruptedException ie) {
- log.error(ie.getMessage());
- }
- timeout.cancel();
-
- Set<String> currentHodJobIds = currentHodJobs.keySet();
- Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
- TreeSet<String> finishedHodIds = new TreeSet<String>();
- while (currentHodJobIdsIt.hasNext()) {
- String hodId = currentHodJobIdsIt.next();
- if (!jobsInTorque.contains(hodId)) {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String process = aJobData.get("process");
- if (process.equals("0") || process.equals("1")) {
- aJobData.put("status", "C");
- } else {
- finishedHodIds.add(hodId);
- }
- }
- }// while
-
- Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
- while (finishedHodIdsIt.hasNext()) {
- String hodId = finishedHodIdsIt.next();
- currentHodJobs.remove(hodId);
- }
-
- }
-
- private boolean loadQstatData(String hodId) throws IOException, SQLException {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String userId = aJobData.get("userId");
-
- StringBuffer sb = new StringBuffer();
- sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
- String[] qstatCommand = new String[3];
- qstatCommand[0] = "ssh";
- qstatCommand[1] = torqueServer;
- qstatCommand[2] = sb.toString();
-
- String command = qstatCommand[0] + " " + qstatCommand[1] + " "
- + qstatCommand[2];
- ProcessBuilder pb = new ProcessBuilder(qstatCommand);
- Process p = pb.start();
-
- Timer timeout = new Timer();
- TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
- BufferedReader result = new BufferedReader(new InputStreamReader(p
- .getInputStream()));
- ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
- command, false);
- errorHandler.start();
- String line = null;
- String hosts = null;
- long startTimeValue = -1;
- long endTimeValue = Calendar.getInstance().getTimeInMillis();
- long executeTimeValue = Calendar.getInstance().getTimeInMillis();
- boolean qstatfinished;
-
- while ((line = result.readLine()) != null) {
- if (line.indexOf("ctime") >= 0) {
- String startTime = line.split("=")[1].trim();
- // Tue Sep 9 23:44:29 2008
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date startTimeDate;
- try {
- startTimeDate = sdf.parse(startTime);
- startTimeValue = startTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("mtime") >= 0) {
- String endTime = line.split("=")[1].trim();
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date endTimeDate;
- try {
- endTimeDate = sdf.parse(endTime);
- endTimeValue = endTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("etime") >= 0) {
- String executeTime = line.split("=")[1].trim();
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date executeTimeDate;
- try {
- executeTimeDate = sdf.parse(executeTime);
- executeTimeValue = executeTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("exec_host") >= 0) {
- hosts = line.split("=")[1].trim();
- }
- }
-
- if (hosts != null && startTimeValue >= 0) {
- String[] items2 = hosts.split("[+]");
- int num = 0;
- for (int i = 0; i < items2.length; i++) {
- String machinetmp = items2[i];
- if (machinetmp.length() > 3) {
- String machine = items2[i].substring(0, items2[i].length() - 2);
- StringBuffer data = new StringBuffer();
- data.append("HodId=").append(hodId);
- data.append(", Machine=").append(machine);
- if (domain != null) {
- data.append(".").append(domain);
- }
- log.info(data);
- num++;
- }
- }
- Timestamp startTimedb = new Timestamp(startTimeValue);
- Timestamp endTimedb = new Timestamp(endTimeValue);
- StringBuffer data = new StringBuffer();
- long timeQueued = executeTimeValue - startTimeValue;
- data.append("HodID=").append(hodId);
- data.append(", UserId=").append(userId);
- data.append(", StartTime=").append(startTimedb);
- data.append(", TimeQueued=").append(timeQueued);
- data.append(", NumOfMachines=").append(num);
- data.append(", EndTime=").append(endTimedb);
- log.info(data);
- qstatfinished = true;
-
- } else {
-
- qstatfinished = false;
- }
-
- try {
- errorHandler.join();
- } catch (InterruptedException ie) {
- log.error(ie.getMessage());
- }
- result.close();
- timeout.cancel();
-
- return qstatfinished;
- }
-
- private boolean loadTraceJobData(String hodId) throws IOException,
- SQLException {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String userId = aJobData.get("userId");
-
- StringBuffer sb = new StringBuffer();
- sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
- String[] traceJobCommand = new String[3];
- traceJobCommand[0] = "ssh";
- traceJobCommand[1] = torqueServer;
- traceJobCommand[2] = sb.toString();
-
- String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
- + traceJobCommand[2];
- ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
-
- Process p = pb.start();
-
- Timer timeout = new Timer();
- TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
- BufferedReader result = new BufferedReader(new InputStreamReader(p
- .getInputStream()));
- ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
- command, false);
- errorHandler.start();
- String line = null;
- String exit_status = null;
- String hosts = null;
- long timeQueued = -1;
- long startTimeValue = -1;
- long endTimeValue = -1;
- boolean findResult = false;
-
- while ((line = result.readLine()) != null && !findResult) {
- if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
- && line.indexOf("qtime") >= 0) {
- TreeMap<String, String> jobData = new TreeMap<String, String>();
- String[] items = line.split("\\s+");
- for (int i = 0; i < items.length; i++) {
- String[] items2 = items[i].split("=");
- if (items2.length >= 2) {
- jobData.put(items2[0], items2[1]);
- }
-
- }
- String startTime = jobData.get("ctime");
- startTimeValue = Long.valueOf(startTime);
- startTimeValue = startTimeValue - startTimeValue % (60);
- Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
-
- String queueTime = jobData.get("qtime");
- long queueTimeValue = Long.valueOf(queueTime);
-
- String sTime = jobData.get("start");
- long sTimeValue = Long.valueOf(sTime);
-
- timeQueued = sTimeValue - queueTimeValue;
-
- String endTime = jobData.get("end");
- endTimeValue = Long.valueOf(endTime);
- endTimeValue = endTimeValue - endTimeValue % (60);
- Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
-
- exit_status = jobData.get("Exit_status");
- hosts = jobData.get("exec_host");
- String[] items2 = hosts.split("[+]");
- int num = 0;
- for (int i = 0; i < items2.length; i++) {
- String machinetemp = items2[i];
- if (machinetemp.length() >= 3) {
- String machine = items2[i].substring(0, items2[i].length() - 2);
- StringBuffer data = new StringBuffer();
- data.append("HodId=").append(hodId);
- data.append(", Machine=").append(machine);
- if (domain != null) {
- data.append(".").append(domain);
- }
- log.info(data.toString());
- num++;
- }
- }
-
- StringBuffer data = new StringBuffer();
- data.append("HodID=").append(hodId);
- data.append(", UserId=").append(userId);
- data.append(", Status=").append(exit_status);
- data.append(", TimeQueued=").append(timeQueued);
- data.append(", StartTime=").append(startTimedb);
- data.append(", EndTime=").append(endTimedb);
- data.append(", NumOfMachines=").append(num);
- log.info(data.toString());
- findResult = true;
- log.debug(" hod info for job " + hodId + " has been loaded ");
- }// if
-
- }// while
-
- try {
- errorHandler.join();
- } catch (InterruptedException ie) {
- log.error(ie.getMessage());
- }
-
- timeout.cancel();
- boolean tracedone = false;
- if (!findResult) {
-
- String traceCheckCount = aJobData.get("traceCheckCount");
- int traceCheckCountValue = Integer.valueOf(traceCheckCount);
- traceCheckCountValue = traceCheckCountValue + 1;
- aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
-
- log.debug("did not find tracejob info for job " + hodId + ", after "
- + traceCheckCountValue + " times checking");
- if (traceCheckCountValue >= 2) {
- tracedone = true;
- }
- }
- boolean finished = findResult | tracedone;
- return finished;
- }
-
- private void process_data() throws SQLException {
-
- long currentTime = System.currentTimeMillis();
- currentTime = currentTime - currentTime % (60 * 1000);
-
- Set<String> hodIds = currentHodJobs.keySet();
-
- Iterator<String> hodIdsIt = hodIds.iterator();
- while (hodIdsIt.hasNext()) {
- String hodId = hodIdsIt.next();
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String status = aJobData.get("status");
- String process = aJobData.get("process");
- if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
- try {
- boolean result = loadQstatData(hodId);
- if (result) {
- aJobData.put("process", "1");
- currentHodJobs.put(hodId, aJobData);
- }
- } catch (IOException ioe) {
- log.error("load qsat data Error:" + ioe.getMessage());
-
- }
- }
- if (!process.equals("2") && status.equals("C")) {
- try {
- boolean result = loadTraceJobData(hodId);
-
- if (result) {
- aJobData.put("process", "2");
- currentHodJobs.put(hodId, aJobData);
- }
- } catch (IOException ioe) {
- log.error("loadTraceJobData Error:" + ioe.getMessage());
- }
- }// if
-
- } // while
-
- }
-
- private void handle_jobData() throws SQLException {
- try {
- getHodJobInfo();
- } catch (IOException ex) {
- log.error("getQueueInfo Error:" + ex.getMessage());
- return;
- }
- try {
- process_data();
- } catch (SQLException ex) {
- log.error("process_data Error:" + ex.getMessage());
- throw ex;
- }
- }
-
- public void run_forever() throws SQLException {
- while (true) {
- handle_jobData();
- try {
- log.debug("sleeping ...");
- Thread.sleep(this.intervalValue * 1000);
- } catch (InterruptedException e) {
- log.error(e.getMessage());
- }
- }
- }
-
- public void shutdown() {
- }
-}
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
deleted file mode 100644
index 8ea645e..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.util.TimerTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueTimerTask extends TimerTask {
- private Process ps = null;
- private String command;
-
- private static Log log = LogFactory.getLog(TorqueTimerTask.class);
- // public static int timeoutInterval=300;
- public static int timeoutInterval = 180;
-
- public TorqueTimerTask() {
- super();
- // TODO Auto-generated constructor stub
- }
-
- public TorqueTimerTask(Process process, String command) {
- super();
- this.ps = process;
- this.command = command;
-
- }
-
- public void run() {
- ps.destroy();
- log.error("torque command: " + command + " timed out");
-
- }
-
-}
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
index 7ed3148..ff2a022 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
@@ -20,10 +20,11 @@
import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
public class ClusterConfig {
- public static final HashMap<String, String> clusterMap = new HashMap<String, String>();
+ private HashMap<String, String> clusterMap = new HashMap<String, String>();
private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator;
static public String getContents(File aFile) {
@@ -31,9 +32,7 @@
StringBuffer contents = new StringBuffer();
try {
- // use buffering, reading one line at a time
- // FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
+ BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
try {
String line = null; // not declared within while loop
/*
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
index c655e24..70a80c0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.chukwa.util;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Calendar;
@@ -29,7 +30,6 @@
public class HBaseUtil {
private static Logger LOG = Logger.getLogger(HBaseUtil.class);
- static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
static MessageDigest md5 = null;
static {
try {
@@ -50,8 +50,9 @@
}
public static byte[] buildKey(long time, String primaryKey) {
+ Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
c.setTimeInMillis(time);
- byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+ byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
byte[] pk = getHash(primaryKey);
byte[] key = new byte[12];
System.arraycopy(day, 0, key, 0, day.length);
@@ -60,8 +61,9 @@
}
public static byte[] buildKey(long time, String primaryKey, String source) {
+ Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
c.setTimeInMillis(time);
- byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+ byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
byte[] pk = getHash(primaryKey);
byte[] src = getHash(source);
byte[] key = new byte[12];
@@ -73,7 +75,7 @@
private static byte[] getHash(String key) {
byte[] hash = new byte[5];
- System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5);
+ System.arraycopy(md5.digest(key.getBytes(Charset.forName("UTF-8"))), 0, hash, 0, 5);
return hash;
}
}
diff --git a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
index 26e4beb..c1cf37f 100644
--- a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
+++ b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
@@ -43,26 +43,6 @@
}
}
- public void testWrongVersion() {
- ChunkBuilder cb = new ChunkBuilder();
- cb.addRecord("foo".getBytes());
- cb.addRecord("bar".getBytes());
- cb.addRecord("baz".getBytes());
- Chunk c = cb.getChunk();
- DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
- try {
- c.write(ob);
- DataInputBuffer ib = new DataInputBuffer();
- ib.reset(ob.getData(), c.getSerializedSizeEstimate());
- // change current chunkImpl version
- ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1;
- ChunkImpl.read(ib);
- fail("Should have raised an IOexception");
- } catch (IOException e) {
- // right behavior, do nothing
- }
- }
-
public void testTag() {
ChunkBuilder cb = new ChunkBuilder();
cb.addRecord("foo".getBytes());
diff --git a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
index 93500ff..99d8dc1 100644
--- a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
+++ b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
@@ -172,7 +172,8 @@
conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
conf.set("chukwaAgent.checkpoint.interval", "10000");
int portno = conf.getInt("chukwaAgent.control.port", agentPort);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent();
+ agent.start();
conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
conn.start();
sender = new ChukwaHttpSender(conf);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
index a039bc6..5b163c9 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
@@ -59,7 +59,8 @@
conf.setInt("chukwaAgent.http.port", 9090);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
}
public void testJMXAdaptor() {
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
index 367484e..39f151b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
@@ -57,7 +57,8 @@
conf.setInt("chukwaAgent.http.port", 9090);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.adaptorCount());
System.out.println("adding jmx adaptor");
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
index 466053b..948ec5a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
@@ -63,7 +63,8 @@
public void resendAfterStop(String adaptor) throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
String ADAPTORID = "adaptor_test" + System.currentTimeMillis();
String STR = "test data";
int PORTNO = 9878;
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
index 717125b..5748c9b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
@@ -56,7 +56,8 @@
conf.setInt("chukwaAgent.control.port", 0);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File emptyDir = new File(baseDir, "emptyDir2");
createEmptyDir(emptyDir);
@@ -90,7 +91,8 @@
anOldFile.deleteOnExit();
aNewFile.deleteOnExit();
anOldFile.setLastModified(10);//just after epoch
- agent = new ChukwaAgent(conf); //restart agent.
+ agent = ChukwaAgent.getAgent(conf); //restart agent.
+ agent.start();
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertTrue(aNewFile.exists());
@@ -135,7 +137,8 @@
while(retry) {
try {
retry = false;
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
} catch(Exception e) {
retry = true;
}
@@ -167,11 +170,12 @@
anOldFile.deleteOnExit();
aNewFile.deleteOnExit();
anOldFile.setLastModified(10);//just after epoch
- agent = new ChukwaAgent(conf); //restart agent.
-
+ agent = ChukwaAgent.getAgent(conf); //restart agent.
+ agent.start();
+
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertTrue(aNewFile.exists());
-
+
//make sure we started tailing the new, not the old, file.
for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
System.out.println(adaptors.getKey() +": " + adaptors.getValue());
@@ -182,7 +186,7 @@
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertEquals(4, agent.adaptorCount());
agent.shutdown();
-
+
nukeDirContents(checkpointDir);//nuke dir
checkpointDir.delete();
emptyDir.delete();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
index 11a084a..1af52d0 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
@@ -35,7 +35,8 @@
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
}
@Override
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
index eafa12d..fbe249a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
@@ -55,7 +55,8 @@
public void testOnce() throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.adaptorCount());
@@ -75,7 +76,8 @@
ChukwaAgent.AlreadyRunningException, InterruptedException {
int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
for(int i=0; i < tests; ++i) {
if(i % 100 == 0)
System.out.println("buzzed " + i + " times");
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
index bcd940c..4bbf206 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
@@ -24,6 +24,7 @@
import java.net.Socket;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
@@ -34,8 +35,9 @@
public class TestHeartbeatAdaptor extends TestCase {
private volatile boolean shutdown = false;
private final int port = 4321;
- public void testPingAdaptor() throws IOException, InterruptedException{
+ public void testPingAdaptor() throws IOException, InterruptedException, AlreadyRunningException{
ChukwaAgent agent = ChukwaAgent.getAgent();
+ agent.start();
Configuration conf = agent.getConfiguration();
conf.set("chukwa.http.writer.host", "localhost");
conf.set("chukwa.http.writer.port", String.valueOf(port));
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
index 1e0f234..65add51 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
@@ -44,7 +44,8 @@
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File testFile = makeTestFile("chukwaTest", 80,baseDir);
String adaptorId = agent
.processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
index 1c68a1a..fc04f25 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
@@ -36,7 +36,8 @@
try {
Configuration conf = new ChukwaConfiguration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
@@ -79,7 +80,8 @@
Configuration conf = new ChukwaConfiguration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
// Remove any adaptor left over from previous run
ChukwaAgentController cli = new ChukwaAgentController("localhost", agent.getControllerPort());
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
index 570e7f4..d622b5d 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
@@ -49,7 +49,8 @@
ChukwaConfiguration cc = new ChukwaConfiguration();
cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 18); // small in order to have hasMoreData=true
// (with 26 letters we should have 2 chunks)
- agent = new ChukwaAgent(cc);
+ agent = ChukwaAgent.getAgent(cc);
+ agent.start();
ChunkCatcherConnector chunks = new ChunkCatcherConnector();
chunks.start();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
index 40479b5..2a82e79 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
@@ -50,7 +50,8 @@
ChukwaConfiguration cc = new ChukwaConfiguration();
cc.set("chukwaAgent.control.port", "0");
cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
- ChukwaAgent agent = new ChukwaAgent(cc);
+ ChukwaAgent agent = ChukwaAgent.getAgent(cc);
+ agent.start();
int portno = agent.getControllerPort();
while (portno == -1) {
Thread.sleep(1000);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
index fbbfd94..4590ef3 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
@@ -59,7 +59,8 @@
*/
@Before
public void setUp() throws Exception {
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
chunks = new ChunkCatcherConnector();
chunks.start();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
index a71790d..3bda707 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
-
import java.io.*;
import junit.framework.TestCase;
@@ -36,7 +35,7 @@
ChunkCatcherConnector chunks;
Configuration conf = new Configuration();
File baseDir, testFile;
-
+
public TestFileTailingAdaptors() throws IOException {
chunks = new ChunkCatcherConnector();
chunks.start();
@@ -46,13 +45,14 @@
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
conf.set("chukwaAgent.control.port", "0");
- testFile = makeTestFile("chukwaCrSepTest", 80,baseDir);
+ testFile = makeTestFile("chukwaCrSepTest", 80, baseDir);
}
public void testCrSepAdaptor() throws IOException, InterruptedException,
ChukwaAgent.AlreadyRunningException {
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
// Remove any adaptor left over from previous run
// sleep for some time to make sure we don't get chunk from existing streams
@@ -84,81 +84,90 @@
agent.shutdown();
Thread.sleep(2000);
}
-
- public void testRepeatedlyOnBigFile() throws IOException,
- ChukwaAgent.AlreadyRunningException, InterruptedException {
- int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
- ChukwaAgent agent = new ChukwaAgent(conf);
- for(int i=0; i < tests; ++i) {
- if(i % 100 == 0)
+ public void testRepeatedlyOnBigFile() throws IOException,
+ ChukwaAgent.AlreadyRunningException, InterruptedException {
+ int tests = 10; // SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
+
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
+ for (int i = 0; i < tests; ++i) {
+ if (i % 100 == 0)
System.out.println("buzzed " + i + " times");
-
+
assertEquals(0, agent.adaptorCount());
- agent.processAddCommand("add adaptor_test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
+ agent
+ .processAddCommand("add adaptor_test = filetailer.FileTailingAdaptor raw "
+ + testFile.getCanonicalPath() + " 0");
assertEquals(1, agent.adaptorCount());
Chunk c = chunks.waitForAChunk();
String dat = new String(c.getData());
assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));
assertTrue(c.getDataType().equals("raw"));
- if(agent.adaptorCount() > 0)
+ if (agent.adaptorCount() > 0)
agent.stopAdaptor("adaptor_test", false);
}
agent.shutdown();
}
-
- public void testOffsetInAdaptorName() throws IOException, ChukwaAgent.AlreadyRunningException,
- InterruptedException{
- File testFile = makeTestFile("foo", 120,baseDir);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ public void testOffsetInAdaptorName() throws IOException,
+ ChukwaAgent.AlreadyRunningException, InterruptedException {
+ File testFile = makeTestFile("foo", 120, baseDir);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.adaptorCount());
- agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
+ agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw "
+ + testFile.getCanonicalPath() + " 0");
assertEquals(1, agent.adaptorCount());
Thread.sleep(2000);
- agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
+ agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw "
+ + testFile.getCanonicalPath() + " 0");
assertEquals(1, agent.adaptorCount());
chunks.clear();
agent.shutdown();
}
-
+
/**
- * Test that LWFTAdaptor updates lastSlurpTime so that FileTailingAdaptor
- * does not trigger an infinite loop and that slurp() is not called by
+ * Test that LWFTAdaptor updates lastSlurpTime so that FileTailingAdaptor does
+ * not trigger an infinite loop and that slurp() is not called by
* FileTailingAdaptor if file is not updated (see CHUKWA-668)
+ *
* @throws IOException
* @throws ChukwaAgent.AlreadyRunningException
* @throws InterruptedException
*/
- public void testSlurpTimeUpdated() throws IOException, ChukwaAgent.AlreadyRunningException,
- InterruptedException{
- ChukwaAgent agent = new ChukwaAgent(conf);
- File testFile = makeTestFile("fooSlurp", 0,baseDir);
- long startTime = System.currentTimeMillis();
- String adaptorId = agent.processAddCommand("add adaptor_test =" +
- "filetailer.FileTailingAdaptor slurp " +testFile.getCanonicalPath() + " 0");
- FileTailingAdaptor fta = (FileTailingAdaptor)agent.getAdaptor( adaptorId);
- Thread.sleep(500);
- long initializedSlurpTimeValue = fta.lastSlurpTime;
- assertTrue( initializedSlurpTimeValue > startTime); // initialized to current time
-
- makeTestFile("fooSlurp", 2,baseDir);
- Chunk c = chunks.waitForAChunk();
+ public void testSlurpTimeUpdated() throws IOException,
+ ChukwaAgent.AlreadyRunningException, InterruptedException {
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
+ File testFile = makeTestFile("fooSlurp", 0, baseDir);
+ long startTime = System.currentTimeMillis();
+ String adaptorId = agent.processAddCommand("add adaptor_test ="
+ + "filetailer.FileTailingAdaptor slurp " + testFile.getCanonicalPath()
+ + " 0");
+ FileTailingAdaptor fta = (FileTailingAdaptor) agent.getAdaptor(adaptorId);
+ Thread.sleep(500);
+ long initializedSlurpTimeValue = fta.lastSlurpTime;
+ assertTrue(initializedSlurpTimeValue > startTime); // initialized to current
+ // time
- Thread.sleep(2000);
- // lastSlurpTime has been updated because a slurp was done
- long secondSlurpTimeValue = fta.lastSlurpTime;
- assertTrue( secondSlurpTimeValue > initializedSlurpTimeValue);
- assertEquals( fta.fileReadOffset, c.getData().length);
- assertEquals( fta.fileReadOffset, fta.reader.length());
-
- Thread.sleep(2000);
- // ensure we don't try to slurp if file is not updated
- assertEquals( fta.lastSlurpTime, secondSlurpTimeValue);
-
- if(agent.adaptorCount() > 0)
- agent.stopAdaptor("adaptor_test", false);
- agent.shutdown();
+ makeTestFile("fooSlurp", 2, baseDir);
+ Chunk c = chunks.waitForAChunk();
+
+ Thread.sleep(2000);
+ // lastSlurpTime has been updated because a slurp was done
+ long secondSlurpTimeValue = fta.lastSlurpTime;
+ assertTrue(secondSlurpTimeValue > initializedSlurpTimeValue);
+ assertEquals(fta.fileReadOffset, c.getData().length);
+ assertEquals(fta.fileReadOffset, fta.reader.length());
+
+ Thread.sleep(2000);
+ // ensure we don't try to slurp if file is not updated
+ assertEquals(fta.lastSlurpTime, secondSlurpTimeValue);
+
+ if (agent.adaptorCount() > 0)
+ agent.stopAdaptor("adaptor_test", false);
+ agent.shutdown();
}
}
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
index ba0e15d..6f9e2d3 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
@@ -41,6 +41,7 @@
public void testLogRotate() throws IOException, InterruptedException,
ChukwaAgent.AlreadyRunningException {
ChukwaAgent agent = ChukwaAgent.getAgent();
+ agent.start();
// Remove any adaptor left over from previous run
ChukwaConfiguration cc = new ChukwaConfiguration();
int portno = cc.getInt("chukwaAgent.control.port", 9093);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
index 7c5c2d3..f4ec73f 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
@@ -54,7 +54,8 @@
Configuration conf = new ChukwaConfiguration();
conf.set("", "org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector");
try {
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
Thread.sleep(2000);
Map<String, String> adaptorList = agent.getAdaptorList();
for(String id : adaptorList.keySet()) {
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
index 935ddd1..d2334d8 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
@@ -64,7 +64,8 @@
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File testFile = makeTestFile("chukwaRawTest", 80,
new File(System.getProperty("test.build.data", "/tmp")));
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
index a2ee740..363b1ce 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
@@ -46,7 +46,8 @@
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File testFile = makeTestFile();
int startOffset = 0; // skip first line
String adaptorId = agent
@@ -81,7 +82,8 @@
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File testFile = makeTestFile();
int startOffset = 0;
String adaptorId = agent
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
index 07c160c..dd5b702 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
@@ -34,7 +34,8 @@
try {
Configuration conf = new Configuration();
conf.setInt("chukwaAgent.control.port", 0);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
conn.start();
@@ -63,7 +64,8 @@
try {
Configuration conf = new Configuration();
conf.setInt("chukwaAgent.control.port", 0);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
conn.start();
int count = agent.adaptorCount();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
index 8f4eb73..c960b4d 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
@@ -57,7 +57,8 @@
checkpointDir.deleteOnExit();
conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getAbsolutePath());
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
conn.start();
assertEquals(1, agent.adaptorCount());// check that we processed initial
@@ -80,7 +81,8 @@
ps.close();
System.out.println("---------------------restarting");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
conn = new ConsoleOutConnector(agent, true);
conn.start();
assertEquals(2, agent.adaptorCount());// check that we processed initial
@@ -113,7 +115,8 @@
conf.setInt("chukwaAgent.control.port", 0);
System.out.println("\n\n===checkpoints enabled, dir does not exist:");
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.getAdaptorList().size());
agent.shutdown();
Thread.sleep(2000);
@@ -123,7 +126,8 @@
System.out
.println("\n\n===checkpoints enabled, dir exists but is empty:");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.getAdaptorList().size());
agent.shutdown();
Thread.sleep(2000);
@@ -133,7 +137,8 @@
System.out
.println("\n\n===checkpoints enabled, dir exists with zero-length file:");
(new File(NONCE_DIR, "chukwa_checkpoint_0")).createNewFile();
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.getAdaptorList().size());
agent.processAddCommand("ADD org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor testdata 0");
agent.shutdown();
@@ -142,7 +147,8 @@
System.out
.println("\n\n===checkpoints enabled, dir exists with valid checkpoint");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(1, agent.getAdaptorList().size());
agent.shutdown();
Thread.sleep(2000);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java
index 898e03c..bddbc8e 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChukwaSsl.java
@@ -86,7 +86,8 @@
conf.set(SSL_PROTOCOL, sslProtocol);
*/
//start agent, which starts chukwa rest server
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
System.out.println("Started ChukwaRestServer");
testSecureRestAdaptor(agent);
agent.shutdown();
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
index 66b85e7..051163b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
@@ -45,9 +45,9 @@
@Override
protected void setUp() throws AlreadyRunningException {
- agent = ChukwaAgent.getAgent();
if(agent == null){
- agent = new ChukwaAgent();
+ agent = ChukwaAgent.getAgent();
+ agent.start();
}
conf = agent.getConfiguration();
conf.set(CHUNK_QUEUE_LIMIT, Integer.toString(QUEUE_LIMIT));
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
index 6880fce..a34d221 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
@@ -36,7 +36,8 @@
try {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
conn.start();
String l = agent
@@ -68,7 +69,8 @@
try {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
conn.start();
String name = agent
@@ -99,7 +101,8 @@
try {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
conn.start();
String n = agent
@@ -129,7 +132,8 @@
public void testStopAll() throws Exception{
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ChunkCatcherConnector chunks = new ChunkCatcherConnector();
chunks.start();
agent.processAddCommand(
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
index 0d5bbd0..fa2d5f1 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
@@ -56,6 +56,7 @@
protected void setUp() throws Exception {
agent = ChukwaAgent.getAgent();
+ agent.start();
ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
index a3ed9f6..2cfd72c 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
@@ -46,7 +46,8 @@
Server collectorServ = TestDelayedAcks.startCollectorOnPort(conf, PORTNO, collector);
Thread.sleep(1000);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
conn.start();
String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() +
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
index b0d4d1c..6423fce 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
@@ -56,7 +56,8 @@
conf.setInt("constAdaptor.minSleep", 50);
conf.setInt("chukwaAgent.control.port", 0);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
RetryListOfCollectors clist = new RetryListOfCollectors(conf);
clist.add("http://localhost:"+PORTNO+"/chukwa");
HttpConnector conn = new HttpConnector(agent);
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
index eff7660..b067f16 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
@@ -73,7 +73,8 @@
conf.setInt("chukwaAgent.adaptor.context.switch.time", 500);
conf.setInt(AdaptorResetThread.TIMEOUT_OPT, ACK_TIMEOUT);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
ChunkCatcherConnector chunks = new ChunkCatcherConnector();
chunks.start();
assertEquals(0, agent.adaptorCount());
@@ -207,7 +208,8 @@
Server collectorServ = startCollectorOnPort(conf, PORTNO, collector);
Thread.sleep(1000);
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
conn.start();
String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() +
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
index f5f8d26..de34563 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
@@ -66,7 +66,8 @@
Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
Thread.sleep(2000); //for collectors to start
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
HttpConnector conn = new HttpConnector(agent);
RetryListOfCollectors clist = new RetryListOfCollectors(conf);
clist.add("http://localhost:"+(PORTNO+1)+"/");
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java
index 45e29d3..7eaaf05 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/controller/TestAgentClient.java
@@ -38,7 +38,8 @@
protected void setUp() throws ChukwaAgent.AlreadyRunningException {
config = new Configuration();
- agent = new ChukwaAgent(config);
+ agent = ChukwaAgent.getAgent(config);
+ agent.start();
c = new ChukwaAgentController();
connector = new ChunkCatcherConnector();
connector.start();
diff --git a/src/test/java/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java b/src/test/java/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java
deleted file mode 100644
index a0ca055..0000000
--- a/src/test/java/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.validationframework;
-
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
-import org.apache.hadoop.chukwa.datacollection.collector.CollectorStub;
-import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-import org.apache.hadoop.chukwa.validationframework.interceptor.ChunkDumper;
-import org.apache.hadoop.chukwa.validationframework.interceptor.SetupTestClasses;
-import org.apache.hadoop.chukwa.validationframework.util.DataOperations;
-
-public class ChukwaAgentToCollectorValidator {
- public static final int ADD = 100;
- public static final int VALIDATE = 200;
-
- private static void usage() {
- System.out.println("usage ...");
- System.exit(-1);
- }
-
- /**
- * @param args
- * @throws Throwable
- * @throws AlreadyRunningException
- * @throws IOException
- */
- public static void main(String[] args) throws Throwable {
- if (args.length != 2) {
- usage();
- }
-
- int command = -1;
-
- if ("-add".equalsIgnoreCase(args[0])) {
- command = ChukwaAgentToCollectorValidator.ADD;
- } else if ("-validate".equalsIgnoreCase(args[0])) {
- command = ChukwaAgentToCollectorValidator.VALIDATE;
- } else {
- usage();
- }
-
- String chukwaTestRepository = System.getenv("chukwaTestRepository");
- if (chukwaTestRepository == null) {
- chukwaTestRepository = "/tmp/chukwaTestRepository/";
- }
-
- if (!chukwaTestRepository.endsWith("/")) {
- chukwaTestRepository += "/";
- }
-
- String fileName = args[1];
-
- String name = null;
- if (fileName.indexOf("/") >= 0) {
- name = fileName.substring(fileName.lastIndexOf("/"));
- } else {
- name = fileName;
- }
-
- String chukwaTestDirectory = chukwaTestRepository + name;
- String inputFile = chukwaTestDirectory + "/input/" + name;
- String outputDir = null;
-
- if (command == ChukwaAgentToCollectorValidator.ADD) {
- File dir = new File(chukwaTestDirectory + "/input/");
- if (dir.exists()) {
- throw new RuntimeException(
- "a test with the same input file is already there, remove it first");
- }
- dir.mkdirs();
- DataOperations.copyFile(fileName, inputFile);
- outputDir = "/gold";
- } else {
- outputDir = "/" + System.currentTimeMillis();
- }
-
- System.out.println("chukwaTestDirectory [" + chukwaTestDirectory + "]");
- System.out.println("command ["
- + ((command == ChukwaAgentToCollectorValidator.ADD) ? "ADD"
- : "VALIDATE") + "]");
- System.out.println("fileName [" + inputFile + "]");
-
- ChukwaConfiguration conf = new ChukwaConfiguration(true);
- String collectorOutputDir = conf.get("chukwaCollector.outputDir");
-
- prepareAndSendData(chukwaTestDirectory + outputDir, inputFile,
- collectorOutputDir);
- extractRawLog(chukwaTestDirectory + outputDir, name, collectorOutputDir);
- boolean rawLogTestResult = validateRawLogs(chukwaTestDirectory + outputDir,
- name);
-
- boolean binLogTestResult = true;
-
- if (command == ChukwaAgentToCollectorValidator.VALIDATE) {
- binLogTestResult = validateOutputs(chukwaTestDirectory + outputDir, name);
- }
-
- if (rawLogTestResult == true && binLogTestResult == true) {
- System.out.println("test OK");
- System.exit(10);
- } else {
- System.out.println("test KO");
- throw new RuntimeException("test failed for file [" + name + "]");
- }
- }
-
- public static void prepareAndSendData(String dataRootFolder,
- String inputFile, String dataSinkDirectory) throws Throwable {
-
- ChunkDumper.testRepositoryDumpDir = dataRootFolder + "/";
-
- SetupTestClasses.setupClasses();
-
- // clean up the collector outputDir.
- File collectorDir = new File(dataSinkDirectory);
- String[] files = collectorDir.list();
- for (String f : files) {
- File file = new File(dataSinkDirectory + File.separator + f);
- file.delete();
- System.out.println("Deleting previous collectors files: " + f);
- }
-
- System.out.println("Starting agent");
- String[] agentArgs = new String[0];
- ChukwaAgent.main(agentArgs);
-
- // Start the collector
- System.out.println("Starting collector");
- CollectorStub.main(new String[0]);
-
- // Start the agent
- ChukwaAgent agent = ChukwaAgent.getAgent();
-
- int portno = 9093; // Default
- ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
- // ADD
- // org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.
- // CharFileTailingAdaptorUTF8NewLineEscaped
- // SysLog
- // 0 /var/log/messages
- // 0
- System.out.println("Adding adaptor");
- String adaptor = cli.add(
- "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
- "AutomatedTestType", "0 " + inputFile, 0);
-
- cli.remove(adaptor);
- System.out.println("Adaptor removed");
- agent.shutdown();
- System.out.println("Shutting down agent");
- CollectorStub.jettyServer.stop();
- System.out.println("Shutting down collector");
- Thread.sleep(2000);
- }
-
- public static void extractRawLog(String dataRootFolder, String fileName,
- String dataSinkDirectory) throws Exception {
- // Adaptor output
- DataOperations
- .extractRawLogFromDump(dataRootFolder + "/adaptor/", fileName);
- // Sender output
- DataOperations.extractRawLogFromDump(dataRootFolder + "/sender/", fileName);
-
- // Collector output
- File dir = new File(dataRootFolder + "/collector/");
- dir.mkdirs();
-
- File dataSinkDir = new File(dataSinkDirectory);
- String[] doneFiles = dataSinkDir.list();
- // Move done file to the final directory
- for (String f : doneFiles) {
- String outputFile = null;
- if (f.endsWith(".done")) {
- outputFile = fileName + ".done";
- } else {
- outputFile = fileName + ".crc";
- }
- System.out.println("Moving that file [" + dataSinkDirectory
- + File.separator + f + "] to [" + dataRootFolder + "/collector/"
- + outputFile + "]");
- DataOperations.copyFile(dataSinkDirectory + File.separator + f,
- dataRootFolder + "/collector/" + outputFile);
- }
-
- DataOperations.extractRawLogFromdataSink(ChunkDumper.testRepositoryDumpDir
- + "/collector/", fileName);
- }
-
- public static boolean validateRawLogs(String dataRootFolder, String fileName) {
- boolean result = true;
- // Validate Adaptor
- boolean adaptorMD5 = DataOperations.validateMD5(dataRootFolder
- + "/../input/" + fileName, dataRootFolder + "/adaptor/" + fileName
- + ".raw");
- if (!adaptorMD5) {
- System.out.println("Adaptor validation failed");
- result = false;
- }
- // Validate Sender
- boolean senderMD5 = DataOperations.validateMD5(dataRootFolder
- + "/../input/" + fileName, dataRootFolder + "/sender/" + fileName
- + ".raw");
- if (!senderMD5) {
- System.out.println("Sender validation failed");
- result = false;
- }
- // Validate DataSink
- boolean collectorMD5 = DataOperations.validateMD5(dataRootFolder
- + "/../input/" + fileName, dataRootFolder + "/collector/" + fileName
- + ".raw");
- if (!collectorMD5) {
- System.out.println("collector validation failed");
- result = false;
- }
-
- return result;
- }
-
- public static boolean validateOutputs(String dataRootFolder, String fileName) {
- boolean result = true;
- // Validate Adaptor
- boolean adaptorMD5 = DataOperations.validateMD5(dataRootFolder
- + "/../gold/adaptor/" + fileName + ".bin", dataRootFolder + "/adaptor/"
- + fileName + ".bin");
- if (!adaptorMD5) {
- System.out.println("Adaptor bin validation failed");
- result = false;
- }
- // Validate Sender
- boolean senderMD5 = DataOperations.validateMD5(dataRootFolder
- + "/../gold/sender/" + fileName + ".bin", dataRootFolder + "/sender/"
- + fileName + ".bin");
- if (!senderMD5) {
- System.out.println("Sender bin validation failed");
- result = false;
- }
- // Validate DataSink
- // boolean collectorMD5 = DataOperations.validateRawLog(dataRootFolder +
- // "/../gold/collector/" + fileName + ".done", dataRootFolder +
- // "/collector/" + fileName + ".done");
- // if (!collectorMD5)
- // {
- // System.out.println("collector bin validation failed");
- // result = false;
- // }
-
- return result;
- }
-}
\ No newline at end of file