blob: 7ab915ff56b7792e04f68ec646e96cf6ed512187 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.controller;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.log4j.Logger;
/**
* A convenience library for applications to communicate to the
* {@link ChukwaAgent}. Can be used to register and unregister new
* {@link Adaptor}s. Also contains functions for applications to use for
* handling log rations.
*/
public class ChukwaAgentController {
static Logger log = Logger.getLogger(ChukwaAgentController.class);
public class AddAdaptorTask extends TimerTask {
String adaptorName;
String type;
String params;
private long offset;
long numRetries;
long retryInterval;
AddAdaptorTask(String adaptorName, String type, String params, long offset,
long numRetries, long retryInterval) {
this.adaptorName = adaptorName;
this.type = type;
this.params = params;
this.offset = offset;
this.numRetries = numRetries;
this.retryInterval = retryInterval;
}
@Override
public void run() {
try {
log.info("Trying to resend the add command [" + adaptorName + "]["
+ offset + "][" + params + "] [" + numRetries + "]");
addByName(null, adaptorName, type, params, offset, numRetries, retryInterval);
} catch (Exception e) {
log.warn("Exception in AddAdaptorTask.run", e);
e.printStackTrace();
}
}
}
// our default adaptors, provided here for convenience
public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
static int DEFAULT_PORT = 9093;
static String DEFAULT_HOST = "localhost";
static int numArgs = 0;
class Adaptor {
public String id;
final public String className;
final public String params;
final public String appType;
public long offset;
Adaptor(String className, String appType, String params, long offset) {
this.className = className;
this.appType = appType;
this.params = params;
this.offset = offset;
}
Adaptor(String id, String className, String appType, String params,
long offset) {
this.id = id;
this.className = className;
this.appType = appType;
this.params = params;
this.offset = offset;
}
/**
* Registers this {@link Adaptor} with the agent running at the specified
* hostname and portno
*
* @return The id of the this {@link Adaptor}, assigned by the agent
* upon successful registration
* @throws IOException if problem bind to agent controller port
*/
String register() throws IOException {
Socket s = new Socket(hostname, portno);
try {
s.setSoTimeout(60000);
} catch (SocketException e) {
log.warn("Error while settin soTimeout to 60000");
e.printStackTrace();
}
PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
.getOutputStream(), Charset.forName("UTF-8")));
if(id != null)
bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
else
bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(s
.getInputStream(), Charset.forName("UTF-8")));
String resp = br.readLine();
if (resp != null) {
String[] fields = resp.split(" ");
if (fields[0].equals("OK")) {
id = fields[fields.length - 1];
}
}
s.close();
return id;
}
void unregister() throws IOException {
Socket s = new Socket(hostname, portno);
try {
s.setSoTimeout(60000);
} catch (SocketException e) {
log.warn("Error while settin soTimeout to 60000");
}
PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
.getOutputStream(), Charset.forName("UTF-8")));
bw.println("SHUTDOWN " + id);
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(s
.getInputStream(), Charset.forName("UTF-8")));
String resp = br.readLine();
if (resp == null || !resp.startsWith("OK")) {
log.error("adaptor unregister error, id: " + id);
} else if (resp.startsWith("OK")) {
String[] respSplit = resp.split(" ");
String newOffset = respSplit[respSplit.length - 1];
try {
offset = Long.parseLong(newOffset);
} catch (NumberFormatException nfe) {
log.error("adaptor didn't shutdown gracefully.\n" + nfe);
}
}
s.close();
}
public String toString() {
String[] namePieces = className.split("\\.");
String shortName = namePieces[namePieces.length - 1];
return id + " " + shortName + " " + appType + " " + params + " " + offset;
}
}
Map<String, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<String, Adaptor>();
Map<String, ChukwaAgentController.Adaptor> runningInstanceAdaptors = new HashMap<String, Adaptor>();
Map<String, ChukwaAgentController.Adaptor> pausedAdaptors;
String hostname;
int portno;
public ChukwaAgentController() {
portno = DEFAULT_PORT;
hostname = DEFAULT_HOST;
pausedAdaptors = new HashMap<String, Adaptor>();
syncWithAgent();
}
public ChukwaAgentController(String hostname, int portno) {
this.hostname = hostname;
this.portno = portno;
pausedAdaptors = new HashMap<String, Adaptor>();
syncWithAgent();
}
private boolean syncWithAgent() {
// set up adaptors by using list here
try {
runningAdaptors = list();
return true;
} catch (IOException e) {
System.err.println("Error initializing ChukwaClient with list of "
+ "currently registered adaptors, clearing our local list of adaptors");
// e.printStackTrace();
// if we can't connect to the LocalAgent, reset/clear our local view of
// the Adaptors.
runningAdaptors = new HashMap<String, ChukwaAgentController.Adaptor>();
return false;
}
}
/**
* Registers a new adaptor. Makes no guarantee about success. On failure, we
* print a message to stderr and ignore silently so that an application
* doesn't crash if it's attempt to register an adaptor fails. This call does
* not retry a conection. for that use the overloaded version of this which
* accepts a time interval and number of retries
* @param adaptorName is adaptor class name
* @param type is data type
* @param params is adaptor specific parameters
* @param offset is starting sequence id
*
* @return the id number of the adaptor, generated by the agent
*/
public String add(String adaptorName, String type, String params, long offset) {
return addByName(null, adaptorName, type, params, offset, 20, 15 * 1000);// retry for
// five
// minutes,
// every
// fifteen
// seconds
}
/**
* Registers a new adaptor. Makes no guarantee about success. On failure, to
* connect to server, will retry <code>numRetries</code> times, every
* <code>retryInterval</code> milliseconds.
* @param adaptorID is unique adaptor identifier
* @param adaptorName is adaptor class name
* @param type is user defined data type name
* @param params is adaptor specific configuration
* @param offset is starting sequence id
* @param numRetries is number of retries
* @param retryInterval is time between retries
*
* @return the id number of the adaptor, generated by the agent
*/
public String addByName(String adaptorID, String adaptorName, String type, String params, long offset,
long numRetries, long retryInterval) {
ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
adaptorName, type, params, offset);
adaptor.id = adaptorID;
if (numRetries >= 0) {
try {
adaptorID = adaptor.register();
if (adaptorID != null) {
runningAdaptors.put(adaptorID, adaptor);
runningInstanceAdaptors.put(adaptorID, adaptor);
} else {
System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
}
} catch (IOException ioe) {
log.warn("AgentClient failed to contact the agent ("
+ hostname + ":" + portno + ")");
log.warn("Scheduling a agent connection retry for adaptor add() in another "
+ retryInterval
+ " milliseconds, "
+ numRetries
+ " retries remaining");
Timer addFileTimer = new Timer();
addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
offset, numRetries - 1, retryInterval), retryInterval);
}
} else {
System.err.println("Giving up on connecting to the local agent");
}
return adaptorID;
}
public synchronized ChukwaAgentController.Adaptor remove(String adaptorID)
throws IOException {
syncWithAgent();
ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
if ( a != null ) {
a.unregister();
}
return a;
}
public void remove(String className, String appType, String filename)
throws IOException {
syncWithAgent();
// search for FileTail adaptor with string of this file name
// get its id, tell it to unregister itself with the agent,
// then remove it from the list of adaptors
for (Adaptor a : runningAdaptors.values()) {
if (a.className.equals(className) && a.params.equals(filename)
&& a.appType.equals(appType)) {
remove(a.id);
}
}
}
public void removeAll() {
syncWithAgent();
ArrayList<String> keyset = new ArrayList<String>();
keyset.addAll( runningAdaptors.keySet());
for (String id : keyset) {
try {
remove(id);
} catch (IOException ioe) {
System.err.println("Error removing an adaptor in removeAll()");
ioe.printStackTrace();
}
log.info("Successfully removed adaptor " + id);
}
}
public void removeInstanceAdaptors() {
// Remove adaptors created by this instance of chukwa agent controller.
// Instead of removing using id, this is removed by using the stream name
// and record type. This prevents the system to shutdown the wrong
// adaptor after agent crashes.
for (Adaptor a : runningInstanceAdaptors.values()) {
try {
remove(a.className, a.appType, a.params);
} catch (IOException ioe) {
log.warn("Error removing an adaptor in removeInstanceAdaptors()");
ioe.printStackTrace();
}
}
}
Map<String, ChukwaAgentController.Adaptor> list() throws IOException {
Socket s = new Socket(hostname, portno);
try {
s.setSoTimeout(60000);
} catch (SocketException e) {
log.warn("Error while settin soTimeout to 60000");
e.printStackTrace();
}
PrintWriter bw = new PrintWriter(
new OutputStreamWriter(s.getOutputStream(), Charset.forName("UTF-8")));
bw.println("LIST");
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(s
.getInputStream(), Charset.forName("UTF-8")));
String ln;
Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
while ((ln = br.readLine()) != null) {
if (ln.equals("")) {
break;
} else {
String[] parts = ln.split("\\s+");
if (parts.length >= 4) { // should have id, className appType, params,
// offset
String id = parts[0].substring(0, parts[0].length() - 1); // chop
// off
// the
// right
// -
// paren
long offset = Long.parseLong(parts[parts.length - 1]);
StringBuilder tmpParams = new StringBuilder();
tmpParams.append(parts[3]);
for (int i = 4; i < parts.length - 1; i++) {
tmpParams.append(" ");
tmpParams.append(parts[i]);
}
listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams.toString(),
offset));
}
}
}
s.close();
return listResult;
}
// ************************************************************************
// The following functions are convenience functions, defining an easy
// to use API for application developers to integrate chukwa into their app
// ************************************************************************
/**
* Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0.
* Checks to see if the file is being watched already, if so, won't register
* another adaptor with the agent. If you have run the tail adaptor on this
* file before and rotated or emptied the file you should use
* {@link ChukwaAgentController#pauseFile(String, String)} and
* {@link ChukwaAgentController#resumeFile(String, String)} which will store
* the adaptors metadata and re-use them to pick up where it left off.
* @param appType is user defined name for the data stream
*
* @param filename of the file for the tail adaptor to start monitoring
* @param numRetries is number of retries
* @param retryInterval is time between retries
* @return the id number of the adaptor, generated by the agent
*/
public String addFile(String appType, String filename, long numRetries,
long retryInterval) {
filename = new File(filename).getAbsolutePath();
// TODO: Mabye we want to check to see if the file exists here?
// Probably not because they might be talking to an agent on a different
// machine?
// check to see if this file is being watched already, if yes don't set up
// another adaptor for it
boolean isDuplicate = false;
for (Adaptor a : runningAdaptors.values()) {
if (a.className.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
&& a.params.endsWith(filename)) {
isDuplicate = true;
}
}
if (!isDuplicate) {
return addByName(null, DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
numRetries, retryInterval);
} else {
log.info("An adaptor for filename \"" + filename
+ "\", type \"" + appType
+ "\", exists already, addFile() command aborted");
return null;
}
}
public String addFile(String appType, String filename) {
return addFile(appType, filename, 0, 0);
}
/**
* Pause all active adaptors of the default file tailing type who are tailing
* this file This means we actually stop the adaptor and it goes away forever,
* but we store it state so that we can re-launch a new adaptor with the same
* state later.
*
* @param appType is application type
* @param filename is file name suffix pattern
* @return array of adaptorID numbers which have been created and assigned the
* state of the formerly paused adaptors
* @throws IOException if error pausing adaptors
*/
public Collection<String> pauseFile(String appType, String filename)
throws IOException {
syncWithAgent();
// store the unique streamid of the file we are pausing.
// search the list of adaptors for this filename
// store the current offset for it
List<String> results = new ArrayList<String>();
for (Adaptor a : runningAdaptors.values()) {
if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
&& a.appType.equals(appType)) {
pausedAdaptors.put(a.id, a); // add it to our list of paused adaptors
remove(a.id); // tell the agent to remove/unregister it
results.add(a.id);
}
}
return results;
}
public boolean isFilePaused(String appType, String filename) {
for (Adaptor a : pausedAdaptors.values()) {
if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
&& a.appType.equals(appType)) {
return true;
}
}
return false;
}
/**
* Resume all adaptors for this filename that have been paused
*
* @param appType is application type
* @param filename filename by which to lookup adaptors which are paused (and
* tailing this file)
* @return an array of the new adaptor ID numbers which have resumed where the
* old adaptors left off
* @throws IOException if unable to resume all adaptors
*/
public Collection<String> resumeFile(String appType, String filename)
throws IOException {
syncWithAgent();
// search for a record of this paused file
List<String> results = new ArrayList<String>();
for (Adaptor a : pausedAdaptors.values()) {
if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
&& a.appType.equals(appType)) {
String newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
+ filename, a.offset);
pausedAdaptors.remove(a.id);
a.id = newID;
results.add(a.id);
}
}
return results;
}
public void removeFile(String appType, String filename) throws IOException {
syncWithAgent();
// search for FileTail adaptor with string of this file name
// get its id, tell it to unregister itself with the agent,
// then remove it from the list of adaptors
for (Adaptor a : runningAdaptors.values()) {
if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
&& a.appType.equals(appType)) {
remove(a.id);
}
}
}
// ************************************************************************
// command line utilities
// ************************************************************************
public static void main(String[] args) {
ChukwaAgentController c = getClient(args);
if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
doAddFile(c, args[1], args[2]);
} else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
doRemoveFile(c, args[1], args[2]);
} else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
doList(c);
} else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
doRemoveAll(c);
} else {
System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
System.err.println(" ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
System.err.println(" ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
System.err.println(" ChukwaClient list [IP] [port]");
System.err.println(" ChukwaClient removeAll [IP] [port]");
}
}
private static ChukwaAgentController getClient(String[] args) {
int portno = 9093;
String hostname = "localhost";
numArgs = args.length;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-h") && args.length > i + 1) {
hostname = args[i + 1];
log.debug("Setting hostname to: " + hostname);
numArgs -= 2; // subtract for the flag and value
} else if (args[i].equals("-p") && args.length > i + 1) {
portno = Integer.parseInt(args[i + 1]);
log.debug("Setting portno to: " + portno);
numArgs -= 2; // subtract for the flat, i.e. -p, and value
}
}
return new ChukwaAgentController(hostname, portno);
}
private static String doAddFile(ChukwaAgentController c, String appType,
String params) {
log.info("Adding adaptor with filename: " + params);
String adaptorID = c.addFile(appType, params);
if (adaptorID != null) {
log.info("Successfully added adaptor, id is:" + adaptorID);
} else {
log.error("Agent reported failure to add adaptor.");
}
return adaptorID;
}
private static void doRemoveFile(ChukwaAgentController c, String appType,
String params) {
try {
log.debug("Removing adaptor with filename: " + params);
c.removeFile(appType, params);
} catch (IOException e) {
e.printStackTrace();
}
}
private static void doList(ChukwaAgentController c) {
try {
Iterator<Adaptor> adptrs = c.list().values().iterator();
while (adptrs.hasNext()) {
log.debug(adptrs.next().toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void doRemoveAll(ChukwaAgentController c) {
log.info("Removing all adaptors");
c.removeAll();
}
}