blob: 512c51ad4fcddce1be9fb8e916460166b544c84f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import org.apache.flume.SourceRunner;
import org.apache.flume.node.NodeConfiguration;
import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.config.ConfigurationException;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.helpers.NameUtil;
import org.apache.logging.log4j.util.PropertiesUtil;
import java.util.Locale;
import java.util.Properties;
/**
*
*/
public class FlumeEmbeddedManager extends AbstractFlumeManager {
/** Name for the Flume source */
protected static final String SOURCE_NAME = "log4j-source";
private static ManagerFactory factory = new FlumeManagerFactory();
private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
private static final String IN_MEMORY = "InMemory";
private final FlumeNode node;
private NodeConfiguration conf;
private final Log4jEventSource source;
private final String shortName;
/**
* Constructor
* @param name The unique name of this manager.
* @param node The Flume Node.
*/
protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
super(name);
this.node = node;
this.shortName = shortName;
final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
if (runner == null || runner.getSource() == null) {
throw new IllegalStateException("No Source has been created for Appender " + shortName);
}
source = (Log4jEventSource) runner.getSource();
}
/**
* Returns a FlumeEmbeddedManager.
* @param name The name of the manager.
* @param agents The agents to use.
* @param properties Properties for the embedded manager.
* @param batchSize The number of events to include in a batch.
* @param dataDir The directory where the Flume FileChannel should write to.
* @return A FlumeAvroManager.
*/
public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
int batchSize, final String dataDir) {
if (batchSize <= 0) {
batchSize = 1;
}
if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
throw new IllegalArgumentException("Either an Agent or properties are required");
} else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
}
final StringBuilder sb = new StringBuilder();
boolean first = true;
if (agents != null && agents.length > 0) {
sb.append("FlumeEmbedded[");
for (final Agent agent : agents) {
if (!first) {
sb.append(",");
}
sb.append(agent.getHost()).append(":").append(agent.getPort());
first = false;
}
sb.append("]");
} else {
String sep = "";
sb.append(name).append(":");
final StringBuilder props = new StringBuilder();
for (final Property prop : properties) {
props.append(sep);
props.append(prop.getName()).append("=").append(prop.getValue());
sep = ",";
}
sb.append(NameUtil.md5(props.toString()));
}
return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
new FactoryData(name, agents, properties, batchSize, dataDir));
}
@Override
public void send(final FlumeEvent event, final int delay, final int retries) {
source.send(event);
}
@Override
protected void releaseSub() {
node.stop();
}
/**
* Factory data.
*/
private static class FactoryData {
private final Agent[] agents;
private final Property[] properties;
private final int batchSize;
private final String dataDir;
private final String name;
/**
* Constructor.
* @param name The name of the Appender.
* @param agents The agents.
* @param properties The Flume configuration properties.
* @param batchSize The number of events to include in a batch.
* @param dataDir The directory where Flume should write to.
*/
public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
final String dataDir) {
this.name = name;
this.agents = agents;
this.batchSize = batchSize;
this.properties = properties;
this.dataDir = dataDir;
}
}
/**
* Avro Manager Factory.
*/
private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
/**
* Create the FlumeAvroManager.
* @param name The name of the entity to manage.
* @param data The data required to create the entity.
* @return The FlumeAvroManager.
*/
public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
try {
final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
data.dataDir);
final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
node.start();
return new FlumeEmbeddedManager(name, data.name, node);
} catch (final Exception ex) {
LOGGER.error("Could not create FlumeEmbeddedManager", ex);
}
return null;
}
private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
final int batchSize, String dataDir) {
final Properties props = new Properties();
if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
LOGGER.error("No Flume configuration provided");
throw new ConfigurationException("No Flume configuration provided");
}
if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
LOGGER.error("Agents and Flume configuration cannot both be specified");
throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
}
if (agents != null && agents.length > 0) {
props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
if (dataDir != null && dataDir.length() > 0) {
if (dataDir.equals(IN_MEMORY)) {
props.put(name + ".channels", "primary");
props.put(name + ".channels.primary.type", "memory");
} else {
props.put(name + ".channels", "primary");
props.put(name + ".channels.primary.type", "file");
if (!dataDir.endsWith(FiLE_SEP)) {
dataDir = dataDir + FiLE_SEP;
}
props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
props.put(name + ".channels.primary.dataDirs", dataDir + "data");
}
} else {
props.put(name + ".channels", "primary");
props.put(name + ".channels.primary.type", "file");
}
final StringBuilder sb = new StringBuilder();
String leading = "";
int priority = agents.length;
for (int i = 0; i < agents.length; ++i) {
sb.append(leading).append("agent").append(i);
leading = " ";
final String prefix = name + ".sinks.agent" + i;
props.put(prefix + ".channel", "primary");
props.put(prefix + ".type", "avro");
props.put(prefix + ".hostname", agents[i].getHost());
props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
props.put(prefix + ".batch-size", Integer.toString(batchSize));
props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
--priority;
}
props.put(name + ".sinks", sb.toString());
props.put(name + ".sinkgroups", "group1");
props.put(name + ".sinkgroups.group1.sinks", sb.toString());
props.put(name + ".sinkgroups.group1.processor.type", "failover");
final String sourceChannels = "primary";
props.put(name + ".channels", sourceChannels);
props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
} else {
String channels = null;
String[] sinks = null;
props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
for (final Property property : properties) {
final String key = property.getName();
if (key == null || key.length() == 0) {
final String msg = "A property name must be provided";
LOGGER.error(msg);
throw new ConfigurationException(msg);
}
final String upperKey = key.toUpperCase(Locale.ENGLISH);
if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
final String msg =
"Specification of the agent name is allowed in Flume Appender configuration: " + key;
LOGGER.error(msg);
throw new ConfigurationException(msg);
}
if (upperKey.startsWith("SOURCES.")) {
final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
LOGGER.error(msg);
throw new ConfigurationException(msg);
}
final String value = property.getValue();
if (value == null || value.length() == 0) {
final String msg = "A value for property " + key + " must be provided";
LOGGER.error(msg);
throw new ConfigurationException(msg);
}
if (upperKey.equals("CHANNELS")) {
channels = value.trim();
} else if (upperKey.equals("SINKS")) {
sinks = value.trim().split(" ");
}
props.put(name + '.' + key, value);
}
String sourceChannels = channels;
if (channels == null) {
sourceChannels = "primary";
props.put(name + ".channels", sourceChannels);
}
props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
if (sinks == null || sinks.length == 0) {
final String msg = "At least one Sink must be specified";
LOGGER.error(msg);
throw new ConfigurationException(msg);
}
}
return props;
}
}
}