blob: eda590c465d46f13639335c69217e0b79d937836 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.AvroSource;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.XMLConfigurationFactory;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.status.StatusLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.zip.GZIPInputStream;
/**
*
*/
public class FlumeEmbeddedAppenderTest {
private static final String CONFIG = "embedded.xml";
private static LoggerContext ctx;
private static final int testServerPort = 12345;
private AvroSource primarySource;
private AvroSource altSource;
private Channel primaryChannel;
private Channel alternateChannel;
private String testPort;
private String altPort;
@BeforeClass
public static void setupClass() {
// System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
final File file = new File("target/file-channel");
if (!deleteFiles(file)) {
System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
}
}
@AfterClass
public static void cleanupClass() {
StatusLogger.getLogger().reset();
}
@Before
public void setUp() throws Exception {
final File file = new File("target/file-channel");
final boolean result = deleteFiles(file);
primarySource = new AvroSource();
primarySource.setName("Primary");
altSource = new AvroSource();
altSource.setName("Alternate");
primaryChannel = new MemoryChannel();
primaryChannel.setName("Primary Memory");
alternateChannel = new MemoryChannel();
alternateChannel.setName("Alternate Memory");
Configurables.configure(primaryChannel, new Context());
Configurables.configure(alternateChannel, new Context());
/*
* Clear out all other appenders associated with this logger to ensure we're
* only hitting the Avro appender.
*/
Context context = new Context();
testPort = String.valueOf(testServerPort);
context.put("port", testPort);
context.put("bind", "localhost");
Configurables.configure(primarySource, context);
context = new Context();
altPort = String.valueOf(testServerPort + 1);
context.put("port", altPort);
context.put("bind", "localhost");
Configurables.configure(altSource, context);
final List<Channel> channels = new ArrayList<Channel>();
channels.add(primaryChannel);
final ChannelSelector primaryCS = new ReplicatingChannelSelector();
primaryCS.setChannels(channels);
final List<Channel> altChannels = new ArrayList<Channel>();
altChannels.add(alternateChannel);
final ChannelSelector alternateCS = new ReplicatingChannelSelector();
alternateCS.setChannels(altChannels);
primarySource.setChannelProcessor(new ChannelProcessor(primaryCS));
altSource.setChannelProcessor(new ChannelProcessor(alternateCS));
primarySource.start();
altSource.start();
Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
primarySource, LifecycleState.START_OR_ERROR));
Assert.assertEquals("Server is started", LifecycleState.START, primarySource.getLifecycleState());
System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
ctx = (LoggerContext) LogManager.getContext(false);
ctx.reconfigure();
}
@After
public void teardown() throws Exception {
System.clearProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
ctx.reconfigure();
primarySource.stop();
altSource.stop();
Assert.assertTrue("Reached stop or error",
LifecycleController.waitForOneOf(primarySource, LifecycleState.STOP_OR_ERROR));
Assert.assertEquals("Server is stopped", LifecycleState.STOP,
primarySource.getLifecycleState());
final File file = new File("target/file-channel");
final boolean result = deleteFiles(file);
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
final Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
for (final ObjectName name : names) {
try {
server.unregisterMBean(name);
} catch (final Exception ex) {
System.out.println("Unable to unregister " + name.toString());
}
}
}
@Test
public void testLog4Event() throws InterruptedException, IOException {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
EventLogger.logEvent(msg);
final Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
final Event event = primaryChannel.take();
Assert.assertNotNull(event);
final String body = getBody(event);
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith("Test Log4j"));
transaction.commit();
transaction.close();
primarySource.stop();
}
@Test
public void testMultiple() throws InterruptedException, IOException {
for (int i = 0; i < 10; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test");
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
final Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
final Event event = primaryChannel.take();
Assert.assertNotNull(event);
final String body = getBody(event);
final String expected = "Test Multiple " + i;
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith(expected));
transaction.commit();
transaction.close();
}
primarySource.stop();
}
@Test
public void testFailover() throws InterruptedException, IOException {
final Logger logger = LogManager.getLogger("testFailover");
logger.debug("Starting testFailover");
for (int i = 0; i < 10; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
final Transaction transaction = primaryChannel.getTransaction();
transaction.begin();
final Event event = primaryChannel.take();
Assert.assertNotNull(event);
final String body = getBody(event);
final String expected = "Test Primary " + i;
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith(expected));
transaction.commit();
transaction.close();
}
// Give the AvroSink time to receive notification and notify the channel.
Thread.sleep(500);
primarySource.stop();
for (int i = 0; i < 10; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
EventLogger.logEvent(msg);
}
for (int i = 0; i < 10; ++i) {
final Transaction transaction = alternateChannel.getTransaction();
transaction.begin();
final Event event = alternateChannel.take();
Assert.assertNotNull(event);
final String body = getBody(event);
final String expected = "Test Alternate " + i;
/* When running in Gump Flume consistently returns the last event from the primary channel after
the failover, which fails this test */
Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
" Received: " + body, body.endsWith(expected));
transaction.commit();
transaction.close();
}
}
private String getBody(final Event event) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
int n = 0;
while (-1 != (n = is.read())) {
baos.write(n);
}
return new String(baos.toByteArray());
}
private static boolean deleteFiles(final File file) {
boolean result = true;
if (file.isDirectory()) {
final File[] files = file.listFiles();
for (final File child : files) {
result &= deleteFiles(child);
}
} else if (!file.exists()) {
return true;
}
return result &= file.delete();
}
}