blob: e542b2fbf84b1e3b8683f770ea3a381c47af7410 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.connector.http;
/**
* This class is responsible for setting up a {@link HttpConnectorClient} with a collectors
* and then repeatedly calling its send function which encapsulates the work of setting up the
* connection with the appropriate collector and then collecting and sending the {@link Chunk}s
* from the global {@link ChunkQueue} which where added by {@link Adaptors}. We want to separate
* the long living (i.e. looping) behavior from the ConnectorClient because we also want to be able
* to use the HttpConnectorClient for its add and send API in arbitrary applications that want to send
* chunks without an {@link LocalAgent} daemon.
*
* * <p>
* On error, tries the list of available collectors, pauses for a minute, and then repeats.
* </p>
* <p> Will wait forever for collectors to come up. </p>
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
import org.apache.hadoop.chukwa.datacollection.sender.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
public class HttpConnector implements Connector, Runnable {
static Logger log = Logger.getLogger(HttpConnector.class);
Timer statTimer = null;
AtomicInteger chunkCount = new AtomicInteger();
int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
int MIN_POST_INTERVAL = 5 * 1000;
public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
boolean ASYNC_ACKS = false;
ChunkQueue chunkQueue;
ChukwaAgent agent;
String argDestination = null;
private volatile boolean stopMe = false;
private Iterator<String> collectors = null;
protected ChukwaSender connectorClient = null;
{ //instance initializer block
statTimer = new Timer();
statTimer.schedule(new TimerTask() {
public void run() {
int count = chunkCount.get();
chunkCount.set(0);
log.info("# http chunks ACK'ed since last report: " + count);
}
}, 100, 60 * 1000);
}
public HttpConnector(ChukwaAgent agent) {
this.agent = agent;
}
public HttpConnector(ChukwaAgent agent, String destination) {
this.agent = agent;
this.argDestination = destination;
log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
+ destination);
}
public void start() {
chunkQueue = DataFactory.getInstance().getEventQueue();
Configuration conf = agent.getConfiguration();
MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
(new Thread(this, "HTTP post thread")).start();
}
public void shutdown() {
stopMe = true;
connectorClient.stop();
}
public void run() {
log.info("HttpConnector started at time:" + System.currentTimeMillis());
// build a list of our destinations from collectors
try {
if(collectors == null)
collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
} catch (IOException e) {
log.error("Failed to retrieve list of collectors from "
+ "conf/collectors file", e);
}
if(ASYNC_ACKS) {
try {
connectorClient = new AsyncAckSender(agent.getConfiguration(), agent);
} catch(IOException e) {
log.fatal("can't read AsycAck hostlist file, exiting");
agent.shutdown(true);
}
} else
connectorClient = new ChukwaHttpSender(agent.getConfiguration());
if (argDestination != null) {
ArrayList<String> tmp = new ArrayList<String>();
tmp.add(argDestination);
collectors = tmp.iterator();
log.info("using collector specified at agent runtime: " + argDestination);
} else
log.info("using collectors from collectors file");
if (collectors == null || !collectors.hasNext()) {
log.error("No collectors specified, exiting (and taking agent with us).");
agent.shutdown(true);// error is unrecoverable, so stop hard.
return;
}
connectorClient.setCollectors(collectors);
try {
long lastPost = System.currentTimeMillis();
while (!stopMe) {
List<Chunk> newQueue = new ArrayList<Chunk>();
try {
// get all ready chunks from the chunkQueue to be sent
chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should
// really do this by size
} catch (InterruptedException e) {
System.out.println("thread interrupted during addChunks(ChunkQueue)");
Thread.currentThread().interrupt();
break;
}
List<ChukwaHttpSender.CommitListEntry> results = connectorClient
.send(newQueue);
// checkpoint the chunks which were committed
for (ChukwaHttpSender.CommitListEntry cle : results) {
agent.reportCommit(cle.adaptor, cle.uuid);
chunkCount.set(chunkCount.get()+1);;
}
long now = System.currentTimeMillis();
long delta = MIN_POST_INTERVAL - now + lastPost;
if(delta > 0) {
Thread.sleep(delta); // wait for stuff to accumulate
}
lastPost = now;
} // end of try forever loop
log.info("received stop() command so exiting run() loop to shutdown connector");
} catch (OutOfMemoryError e) {
log.warn("Bailing out", e);
} catch (InterruptedException e) {
// do nothing, let thread die.
log.warn("Bailing out", e);
} catch (java.io.IOException e) {
log.error("connector failed; shutting down agent");
agent.shutdown(true);
}
}
@Override
public void reloadConfiguration() {
Iterator<String> destinations = null;
// build a list of our destinations from collectors
try {
destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
} catch (IOException e) {
log.error("Failed to retreive list of collectors from conf/collectors file", e);
}
if (destinations != null && destinations.hasNext()) {
collectors = destinations;
connectorClient.setCollectors(collectors);
log.info("Resetting collectors");
}
}
public ChukwaSender getSender() {
return connectorClient;
}
public void setCollectors(Iterator<String> list) {
collectors = list;
}
}