blob: cd856f28fb02a181603a0ceabff9853e2cdc8b78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.ConfigurationFactory;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.logging.log4j.core.test.AvailablePortFinder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Preconditions;
/**
*
*/
public class FlumePersistentAppenderTest {
private static final String CONFIG = "persistent.xml";
private static final String HOSTNAME = "localhost";
private static LoggerContext ctx;
private EventCollector primary;
private EventCollector alternate;
@BeforeClass
public static void setupClass() {
// System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
final File file = new File("target/file-channel");
if (!deleteFiles(file)) {
System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
}
}
@AfterClass
public static void cleanupClass() {
StatusLogger.getLogger().reset();
}
@Before
public void setUp() throws Exception {
final File file = new File("target/persistent");
deleteFiles(file);
/*
* Clear out all other appenders associated with this logger to ensure we're
* only hitting the Avro appender.
*/
final int primaryPort = AvailablePortFinder.getNextAvailable();
final int altPort = AvailablePortFinder.getNextAvailable();
System.setProperty("primaryPort", Integer.toString(primaryPort));
System.setProperty("alternatePort", Integer.toString(altPort));
primary = new EventCollector(primaryPort);
alternate = new EventCollector(altPort);
System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
ctx = LoggerContext.getContext(false);
ctx.reconfigure();
}
@After
public void teardown() throws Exception {
System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
ctx.reconfigure();
primary.stop();
alternate.stop();
final File file = new File("target/file-channel");
deleteFiles(file);
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
final Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
for (final ObjectName name : names) {
try {
server.unregisterMBean(name);
} catch (final Exception ex) {
System.out.println("Unable to unregister " + name.toString());
}
}
}
@Test
public void testLog4Event() throws IOException {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
EventLogger.logEvent(msg);
final Event event = primary.poll();
Assert.assertNotNull(event);
final String body = getBody(event);
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith("Test Log4j"));
}
@Test
public void testMultiple() {
for (int i = 0; i < 10; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test");
msg.put("counter", Integer.toString(i));
EventLogger.logEvent(msg);
}
final boolean[] fields = new boolean[10];
for (int i = 0; i < 10; ++i) {
final Event event = primary.poll();
Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event);
final String value = event.getHeaders().get("counter");
Assert.assertNotNull("Missing 'counter' in map " + event.getHeaders() + ", i = " + i, value);
final int counter = Integer.parseInt(value);
if (fields[counter]) {
Assert.fail("Duplicate event");
} else {
fields[counter] = true;
}
}
for (int i = 0; i < 10; ++i) {
Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
}
}
@Test
public void testFailover() throws InterruptedException {
final Logger logger = LogManager.getLogger("testFailover");
logger.debug("Starting testFailover");
for (int i = 0; i < 10; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
msg.put("counter", Integer.toString(i));
EventLogger.logEvent(msg);
}
boolean[] fields = new boolean[10];
for (int i = 0; i < 10; ++i) {
final Event event = primary.poll();
Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event);
final String value = event.getHeaders().get("counter");
Assert.assertNotNull("Missing counter", value);
final int counter = Integer.parseInt(value);
if (fields[counter]) {
Assert.fail("Duplicate event");
} else {
fields[counter] = true;
}
}
for (int i = 0; i < 10; ++i) {
Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
}
// Give the AvroSink time to receive notification and notify the channel.
Thread.sleep(500);
primary.stop();
for (int i = 0; i < 10; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
msg.put("cntr", Integer.toString(i));
EventLogger.logEvent(msg);
}
fields = new boolean[10];
for (int i = 0; i < 10; ++i) {
final Event event = alternate.poll();
Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event);
final String value = event.getHeaders().get("cntr");
Assert.assertNotNull("Missing counter", value);
final int counter = Integer.parseInt(value);
if (fields[counter]) {
Assert.fail("Duplicate event");
} else {
fields[counter] = true;
}
}
for (int i = 0; i < 10; ++i) {
Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
}
}
@Test
public void testSingle() throws IOException {
final Logger logger = LogManager.getLogger("EventLogger");
final Marker marker = MarkerManager.getMarker("EVENT");
logger.info(marker, "This is a test message");
final Event event = primary.poll();
Assert.assertNotNull(event);
final String body = getBody(event);
Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
body.endsWith("This is a test message"));
}
@Test
public void testMultipleConcurrent() throws InterruptedException {
final int eventsCount = 10000;
final Thread writer1 = new WriterThread(0, eventsCount / 4);
final Thread writer2 = new WriterThread(eventsCount / 4, eventsCount / 2);
final Thread writer3 = new WriterThread(eventsCount / 2, (3 * eventsCount) / 4);
final Thread writer4 = new WriterThread((3 * eventsCount) / 4, eventsCount);
writer1.start();
writer2.start();
writer3.start();
writer4.start();
final boolean[] fields = new boolean[eventsCount];
final Thread reader1 = new ReaderThread(0, eventsCount / 4, fields);
final Thread reader2 = new ReaderThread(eventsCount / 4, eventsCount / 2, fields);
final Thread reader3 = new ReaderThread(eventsCount / 2, (eventsCount * 3) / 4, fields);
final Thread reader4 = new ReaderThread((eventsCount * 3) / 4, eventsCount, fields);
reader1.start();
reader2.start();
reader3.start();
reader4.start();
writer1.join();
writer2.join();
writer3.join();
writer4.join();
reader1.join();
reader2.join();
reader3.join();
reader4.join();
for (int i = 0; i < eventsCount; ++i) {
Assert.assertTrue(
"Channel contained event, but not expected message " + i,
fields[i]);
}
}
@Test
public void testRFC5424Layout() throws IOException {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
EventLogger.logEvent(msg);
final Event event = primary.poll();
Assert.assertNotNull(event);
final String body = getBody(event);
Assert.assertTrue("Structured message does not contain @EID: " + body,
body.contains("Test@18060"));
}
private class WriterThread extends Thread {
private final int start;
private final int stop;
public WriterThread(final int start, final int stop) {
this.start = start;
this.stop = stop;
}
@Override
public void run() {
for (int i = start; i < stop; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage(
"Test", "Test Multiple " + i, "Test");
msg.put("counter", Integer.toString(i));
EventLogger.logEvent(msg);
}
}
}
private class ReaderThread extends Thread {
private final int start;
private final int stop;
private final boolean[] fields;
private ReaderThread(final int start, final int stop, final boolean[] fields) {
this.start = start;
this.stop = stop;
this.fields = fields;
}
@Override
public void run() {
for (int i = start; i < stop; ++i) {
Event event = primary.poll();
while (event == null) {
event = primary.poll();
}
Assert.assertNotNull("Received " + i + " events. Event "
+ (i + 1) + " is null", event);
final String value = event.getHeaders().get("counter");
Assert.assertNotNull("Missing counter", value);
final int counter = Integer.parseInt(value);
if (fields[counter]) {
Assert.fail("Duplicate event");
} else {
fields[counter] = true;
}
}
}
}
@Test
public void testLogInterrupted() {
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
executor.shutdownNow();
final Logger logger = LogManager.getLogger("EventLogger");
final Marker marker = MarkerManager.getMarker("EVENT");
logger.info(marker, "This is a test message");
Assert.assertTrue("Interruption status not preserved",
Thread.currentThread().isInterrupted());
});
}
/*
@Test
public void testPerformance() throws Exception {
long start = System.currentTimeMillis();
int count = 1000;
for (int i = 0; i < count; ++i) {
final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
msg.put("counter", Integer.toString(i));
EventLogger.logEvent(msg);
}
long elapsed = System.currentTimeMillis() - start;
System.out.println("Time to log " + count + " events " + elapsed + "ms");
} */
private String getBody(final Event event) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
int n = 0;
while (-1 != (n = is.read())) {
baos.write(n);
}
return new String(baos.toByteArray());
}
private static boolean deleteFiles(final File file) {
boolean result = true;
if (file.isDirectory()) {
final File[] files = file.listFiles();
if (files != null) {
for (final File child : files) {
result &= deleteFiles(child);
}
}
} else if (!file.exists()) {
return true;
}
return result && file.delete();
}
private static class EventCollector implements AvroSourceProtocol {
private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new LinkedBlockingQueue<>();
private final NettyServer nettyServer;
public EventCollector(final int port) {
final Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
nettyServer = new NettyServer(responder, new InetSocketAddress(HOSTNAME, port));
nettyServer.start();
}
public void stop() {
nettyServer.close();
}
public Event poll() {
AvroFlumeEvent avroEvent = null;
try {
avroEvent = eventQueue.poll(30000, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ie) {
// Ignore the exception.
}
if (avroEvent != null) {
return EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders()));
}
System.out.println("No Event returned");
return null;
}
@Override
public Status append(final AvroFlumeEvent event) throws AvroRemoteException {
eventQueue.add(event);
//System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
return Status.OK;
}
@Override
public Status appendBatch(final List<AvroFlumeEvent> events) throws AvroRemoteException {
Preconditions.checkState(eventQueue.addAll(events));
for (final AvroFlumeEvent event : events) {
// System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
}
return Status.OK;
}
}
private static Map<String, String> toStringMap(final Map<CharSequence, CharSequence> charSeqMap) {
final Map<String, String> stringMap = new HashMap<>();
for (final Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
stringMap.put(entry.getKey().toString(), entry.getValue().toString());
}
return stringMap;
}
}