blob: cc2b02c2bd0f76a6508afb19f176f5edbb03253e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.AvroSource;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.status.StatusLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
/**
*
*/
public class FlumeAppenderTest {
private static LoggerContext ctx;
private static final int testServerPort = 12345;
private AvroSource eventSource;
private Channel channel;
private Logger avroLogger;
private String testPort;
@BeforeClass
public static void setupClass() {
StatusLogger.getLogger().setLevel(Level.OFF);
ctx = (LoggerContext) LogManager.getContext();
}
@AfterClass
public static void cleanupClass() {
}
@Before
public void setUp() throws Exception {
eventSource = new AvroSource();
channel = new MemoryChannel();
Configurables.configure(channel, new Context());
avroLogger = (Logger) LogManager.getLogger("avrologger");
/*
* Clear out all other appenders associated with this logger to ensure we're
* only hitting the Avro appender.
*/
removeAppenders(avroLogger);
final Context context = new Context();
testPort = String.valueOf(testServerPort);
context.put("port", testPort);
context.put("bind", "0.0.0.0");
Configurables.configure(eventSource, context);
final List<Channel> channels = new ArrayList<Channel>();
channels.add(channel);
final ChannelSelector cs = new ReplicatingChannelSelector();
cs.setChannels(channels);
eventSource.setChannelProcessor(new ChannelProcessor(cs));
eventSource.start();
Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
eventSource, LifecycleState.START_OR_ERROR));
Assert.assertEquals("Server is started", LifecycleState.START, eventSource.getLifecycleState());
}
@After
public void teardown() throws Exception {
removeAppenders(avroLogger);
eventSource.stop();
Assert.assertTrue("Reached stop or error",
LifecycleController.waitForOneOf(eventSource, LifecycleState.STOP_OR_ERROR));
Assert.assertEquals("Server is stopped", LifecycleState.STOP,
eventSource.getLifecycleState());
}
@Test
public void testLog4jAvroAppender() throws InterruptedException, IOException {
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
Assert.assertNotNull(avroLogger);
avroLogger.info("Test message");
final Transaction transaction = channel.getTransaction();
transaction.begin();
final Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Test message"));
transaction.commit();
transaction.close();
eventSource.stop();
}
@Test
public void testStructured() throws InterruptedException, IOException {
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger");
Assert.assertNotNull(eventLogger);
eventLogger.addAppender(avroAppender);
eventLogger.setLevel(Level.ALL);
final StructuredDataMessage msg = new StructuredDataMessage("Transer", "Success", "Audit");
msg.put("memo", "This is a memo");
msg.put("acct", "12345");
msg.put("amount", "100.00");
ThreadContext.put("id", UUID.randomUUID().toString());
ThreadContext.put("memo", null);
ThreadContext.put("test", "123");
EventLogger.logEvent(msg);
final Transaction transaction = channel.getTransaction();
transaction.begin();
final Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Success"));
transaction.commit();
transaction.close();
eventSource.stop();
eventLogger.removeAppender(avroAppender);
avroAppender.stop();
}
@Test
public void testMultiple() throws InterruptedException, IOException {
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
Assert.assertNotNull(avroLogger);
for (int i = 0; i < 10; ++i) {
avroLogger.info("Test message " + i);
}
for (int i = 0; i < 10; ++i) {
final Transaction transaction = channel.getTransaction();
transaction.begin();
final Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Test message " + i));
transaction.commit();
transaction.close();
}
eventSource.stop();
}
@Test
public void testBatch() throws InterruptedException, IOException {
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "10", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
Assert.assertNotNull(avroLogger);
for (int i = 0; i < 10; ++i) {
avroLogger.info("Test message " + i);
}
final Transaction transaction = channel.getTransaction();
transaction.begin();
for (int i = 0; i < 10; ++i) {
final Event event = channel.take();
Assert.assertNotNull("No event for item " + i, event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Test message " + i));
}
transaction.commit();
transaction.close();
eventSource.stop();
}
@Test
public void testConnectionRefused() {
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
eventSource.stop();
boolean caughtException = false;
try {
avroLogger.info("message 1");
} catch (final Throwable t) {
//logger.debug("Logging to a non-existant server failed (as expected)", t);
caughtException = true;
}
Assert.assertTrue(caughtException);
}
@Test
public void testNotConnected() throws Exception {
eventSource.stop();
final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
Agent.createAgent("localhost", altPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
Assert.assertTrue("Appender Not started", avroAppender.isStarted());
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
try {
avroLogger.info("Test message");
Assert.fail("Exception should have been thrown");
} catch (Exception ex) {
}
try {
final Context context = new Context();
context.put("port", altPort);
context.put("bind", "0.0.0.0");
Configurables.configure(eventSource, context);
eventSource.start();
} catch (final ChannelException e) {
Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage());
}
avroLogger.info("Test message 2");
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Test message 2"));
transaction.commit();
transaction.close();
}
@Test
public void testReconnect() throws Exception {
final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
Agent.createAgent("localhost", altPort)};
final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
"false", null, null, null, null, null, "true", "1", null, null, null);
avroAppender.start();
avroLogger.addAppender(avroAppender);
avroLogger.setLevel(Level.ALL);
avroLogger.info("Test message");
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Test message"));
transaction.commit();
transaction.close();
eventSource.stop();
try {
final Context context = new Context();
context.put("port", altPort);
context.put("bind", "0.0.0.0");
Configurables.configure(eventSource, context);
eventSource.start();
} catch (final ChannelException e) {
Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage());
}
avroLogger.info("Test message 2");
transaction = channel.getTransaction();
transaction.begin();
event = channel.take();
Assert.assertNotNull(event);
Assert.assertTrue("Channel contained event, but not expected message",
getBody(event).endsWith("Test message 2"));
transaction.commit();
transaction.close();
}
private void removeAppenders(final Logger logger) {
final Map<String,Appender<?>> map = logger.getAppenders();
for (final Map.Entry<String, Appender<?>> entry : map.entrySet()) {
final Appender<?> app = entry.getValue();
avroLogger.removeAppender(app);
app.stop();
}
}
private Appender<?> getAppender(final Logger logger, final String name) {
return logger.getAppenders().get(name);
}
private String getBody(final Event event) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
int n = 0;
while (-1 != (n = is.read())) {
baos.write(n);
}
return new String(baos.toByteArray());
}
}