blob: bcb167a44e9593a7d80d1c547ca867cfe3de7b86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.connector;
/**
* This class is responsible for setting up connections with configured
* storage writers base on configuration of chukwa_agent.xml.
*
* On error, tries the list of available storage writers, pauses for a minute,
* and then repeats.
*
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
public class PipelineConnector implements Connector, Runnable {
static Logger log = Logger.getLogger(PipelineConnector.class);
Timer statTimer = null;
volatile int chunkCount = 0;
int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
int MIN_POST_INTERVAL = 5 * 1000;
public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
ChunkQueue chunkQueue;
private ChukwaAgent agent = null;
private volatile boolean stopMe = false;
protected ChukwaWriter writers = null;
public PipelineConnector() {
//instance initializer block
statTimer = new Timer();
statTimer.schedule(new TimerTask() {
public void run() {
int count = chunkCount;
chunkCount = 0;
log.info("# Data chunks sent since last report: " + count);
}
}, 100, 60 * 1000);
}
public void start() {
chunkQueue = DataFactory.getInstance().getEventQueue();
agent = ChukwaAgent.getAgent();
Configuration conf = agent.getConfiguration();
MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
try {
writers = new PipelineStageWriter(conf);
(new Thread(this, "Pipeline connector thread")).start();
} catch(Exception e) {
log.error("Pipeline initialization error: ", e);
}
}
public void shutdown() {
stopMe = true;
try {
writers.close();
} catch (WriterException e) {
log.warn("Shutdown error: ",e);
}
}
public void run() {
log.info("PipelineConnector started at time:" + System.currentTimeMillis());
try {
long lastPost = System.currentTimeMillis();
while (!stopMe) {
List<Chunk> newQueue = new ArrayList<Chunk>();
try {
// get all ready chunks from the chunkQueue to be sent
chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
CommitStatus result = writers.add(newQueue);
if(result.equals(ChukwaWriter.COMMIT_OK)) {
chunkCount = newQueue.size();
for (Chunk c : newQueue) {
agent.reportCommit(c.getInitiator(), c.getSeqID());
}
}
} catch (WriterException e) {
log.warn("PipelineStageWriter Exception: ", e);
} catch (InterruptedException e) {
log.warn("thread interrupted during addChunks(ChunkQueue)");
Thread.currentThread().interrupt();
break;
}
long now = System.currentTimeMillis();
long delta = MIN_POST_INTERVAL - now + lastPost;
if(delta > 0) {
Thread.sleep(delta); // wait for stuff to accumulate
}
lastPost = now;
} // end of try forever loop
log.info("received stop() command so exiting run() loop to shutdown connector");
} catch (OutOfMemoryError e) {
log.warn("Bailing out", e);
throw new RuntimeException("Shutdown pipeline connector.");
} catch (InterruptedException e) {
// do nothing, let thread die.
log.warn("Bailing out", e);
throw new RuntimeException("Shutdown pipeline connector.");
} catch (Throwable e) {
log.error("connector failed; shutting down agent: ", e);
throw new RuntimeException("Shutdown pipeline connector.");
}
}
@Override
public void reloadConfiguration() {
}
}