blob: 874d241be1cbb8942b356391147f8fefdd60c611 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import org.apache.flume.Channel;
import org.apache.flume.ChannelFactory;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Sink;
import org.apache.flume.SinkFactory;
import org.apache.flume.SinkProcessor;
import org.apache.flume.SinkRunner;
import org.apache.flume.Source;
import org.apache.flume.SourceFactory;
import org.apache.flume.SourceRunner;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.ChannelSelectorFactory;
import org.apache.flume.channel.DefaultChannelFactory;
import org.apache.flume.conf.BasicConfigurationConstants;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.conf.Configurables;
import org.apache.flume.conf.FlumeConfiguration;
import org.apache.flume.conf.FlumeConfigurationError;
import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
import org.apache.flume.conf.file.SimpleNodeConfiguration;
import org.apache.flume.conf.sink.SinkConfiguration;
import org.apache.flume.conf.sink.SinkGroupConfiguration;
import org.apache.flume.conf.source.SourceConfiguration;
import org.apache.flume.node.NodeConfiguration;
import org.apache.flume.node.nodemanager.NodeConfigurationAware;
import org.apache.flume.sink.DefaultSinkFactory;
import org.apache.flume.sink.DefaultSinkProcessor;
import org.apache.flume.sink.SinkGroup;
import org.apache.flume.source.DefaultSourceFactory;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.ConfigurationException;
import org.apache.logging.log4j.status.StatusLogger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
/**
* See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
*/
public class FlumeConfigurationBuilder {
private static final Logger LOGGER = StatusLogger.getLogger();
private final ChannelFactory channelFactory = new DefaultChannelFactory();
private final SourceFactory sourceFactory = new DefaultSourceFactory();
private final SinkFactory sinkFactory = new DefaultSinkFactory();
public NodeConfiguration load(final String name, final Properties props,
final NodeConfigurationAware configurationAware) {
final NodeConfiguration conf = new SimpleNodeConfiguration();
FlumeConfiguration fconfig;
try {
fconfig = new FlumeConfiguration(props);
final List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
if (errors.size() > 0) {
boolean isError = false;
for (final FlumeConfigurationError error : errors) {
final StringBuilder sb = new StringBuilder();
sb.append("Component: ").append(error.getComponentName()).append(" ");
sb.append("Key: ").append(error.getKey()).append(" ");
sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
switch (error.getErrorOrWarning()) {
case ERROR:
isError = true;
LOGGER.error(sb.toString());
break;
case WARNING:
LOGGER.warn(sb.toString());
break;
}
}
if (isError) {
throw new ConfigurationException("Unable to configure Flume due to errors");
}
}
} catch (final RuntimeException ex) {
printProps(props);
throw ex;
}
final FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
if (agentConf != null) {
loadChannels(agentConf, conf);
loadSources(agentConf, conf);
loadSinks(agentConf, conf);
configurationAware.startAllComponents(conf);
} else {
LOGGER.warn("No configuration found for: {}", name);
}
return conf;
}
private void printProps(final Properties props) {
for (final String key : new TreeSet<String>(props.stringPropertyNames())) {
LOGGER.error(key + "=" + props.getProperty(key));
}
}
protected void loadChannels(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
LOGGER.info("Creating channels");
final Set<String> channels = agentConf.getChannelSet();
final Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
for (final String chName : channels) {
final ComponentConfiguration comp = compMap.get(chName);
if (comp != null) {
final Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
Configurables.configure(channel, comp);
conf.getChannels().put(comp.getComponentName(), channel);
}
}
for (final String ch : channels) {
final Context context = agentConf.getChannelContext().get(ch);
if (context != null) {
final Channel channel = channelFactory.create(ch,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
Configurables.configure(channel, context);
conf.getChannels().put(ch, channel);
LOGGER.info("created channel " + ch);
}
}
}
protected void loadSources(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
final Set<String> sources = agentConf.getSourceSet();
final Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
for (final String sourceName : sources) {
final ComponentConfiguration comp = compMap.get(sourceName);
if (comp != null) {
final SourceConfiguration config = (SourceConfiguration) comp;
final Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
Configurables.configure(source, config);
final Set<String> channelNames = config.getChannels();
final List<Channel> channels = new ArrayList<Channel>();
for (final String chName : channelNames) {
channels.add(conf.getChannels().get(chName));
}
final ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);
conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
}
}
final Map<String, Context> sourceContexts = agentConf.getSourceContext();
for (final String src : sources) {
final Context context = sourceContexts.get(src);
if (context != null) {
final Source source = sourceFactory.create(src,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
final List<Channel> channels = new ArrayList<Channel>();
Configurables.configure(source, context);
final String[] channelNames =
context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
for (final String chName : channelNames) {
channels.add(conf.getChannels().get(chName));
}
final Map<String, String> selectorConfig = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, context);
source.setChannelProcessor(channelProcessor);
conf.getSourceRunners().put(src, SourceRunner.forSource(source));
}
}
}
protected void loadSinks(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
final Set<String> sinkNames = agentConf.getSinkSet();
final Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
final Map<String, Sink> sinks = new HashMap<String, Sink>();
for (final String sinkName : sinkNames) {
final ComponentConfiguration comp = compMap.get(sinkName);
if (comp != null) {
final SinkConfiguration config = (SinkConfiguration) comp;
final Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
Configurables.configure(sink, config);
sink.setChannel(conf.getChannels().get(config.getChannel()));
sinks.put(comp.getComponentName(), sink);
}
}
final Map<String, Context> sinkContexts = agentConf.getSinkContext();
for (final String sinkName : sinkNames) {
final Context context = sinkContexts.get(sinkName);
if (context != null) {
final Sink sink = sinkFactory.create(sinkName,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
Configurables.configure(sink, context);
sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
sinks.put(sinkName, sink);
}
}
loadSinkGroups(agentConf, sinks, conf);
}
protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf,
final Map<String, Sink> sinks, final NodeConfiguration conf) {
final Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
final Map<String, String> usedSinks = new HashMap<String, String>();
for (final String groupName : sinkgroupNames) {
final ComponentConfiguration comp = compMap.get(groupName);
if (comp != null) {
final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
final List<String> groupSinkList = groupConf.getSinks();
final List<Sink> groupSinks = new ArrayList<Sink>();
for (final String sink : groupSinkList) {
final Sink s = sinks.remove(sink);
if (s == null) {
final String sinkUser = usedSinks.get(sink);
if (sinkUser != null) {
throw new ConfigurationException(String.format(
"Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
} else {
throw new ConfigurationException(String.format(
"Sink %s of group %s does not exist or is not properly configured", sink,
groupName));
}
}
groupSinks.add(s);
usedSinks.put(sink, groupName);
}
final SinkGroup group = new SinkGroup(groupSinks);
Configurables.configure(group, groupConf);
conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
}
}
// add any unasigned sinks to solo collectors
for (final Map.Entry<String, Sink> entry : sinks.entrySet()) {
if (!usedSinks.containsValue(entry.getKey())) {
final SinkProcessor pr = new DefaultSinkProcessor();
final List<Sink> sinkMap = new ArrayList<Sink>();
sinkMap.add(entry.getValue());
pr.setSinks(sinkMap);
Configurables.configure(pr, new Context());
conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
}
}
}
}