blob: e5c1e97ad90a9742e329ae67efe9c297eaf95ede [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.logging.log4j;
import static org.junit.Assert.*;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde;
import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde;
import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class TestStreamAppender {
static Logger log = Logger.getLogger(TestStreamAppender.class);
@After
public void tearDown() {
log.removeAllAppenders();
MockSystemProducer.listeners.clear();
MockSystemProducer.messagesReceived.clear();
MockSystemAdmin.createdStreamName = "";
}
@Test
public void testDefaultSerde() {
System.setProperty("samza.container.name", "samza-container-1");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions();
assertNotNull(systemProducerAppender.getSerde());
assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
}
@Test
public void testNonDefaultSerde() {
System.setProperty("samza.container.name", "samza-container-1");
String streamName = StreamAppender.getStreamName("log4jTest", "1");
Map<String, String> map = new HashMap<String, String>();
map.put("job.name", "log4jTest");
map.put("job.id", "1");
map.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
map.put("systems.mock.streams." + streamName + ".samza.msg.serde", "log4j-string");
map.put("task.log4j.system", "mock");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(new MapConfig(map));
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions();
assertNotNull(systemProducerAppender.getSerde());
assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
}
@Test
public void testSystemProducerAppenderInContainer() throws InterruptedException {
System.setProperty("samza.container.name", "samza-container-1");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions();
log.addAppender(systemProducerAppender);
List<String> messages = Lists.newArrayList("testing1", "testing2");
logAndVerifyMessages(messages);
}
@Test
public void testSystemProducerAppenderInAM() throws InterruptedException {
System.setProperty("samza.container.name", "samza-job-coordinator");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions();
log.addAppender(systemProducerAppender);
log.info("no-received"); // System isn't initialized yet, so this message should be dropped
systemProducerAppender.setupSystem();
MockSystemProducerAppender.systemInitialized = true;
List<String> messages = Lists.newArrayList("testing3", "testing4");
logAndVerifyMessages(messages);
}
@Test
public void testNoStreamCreationUponSetupByDefault() {
System.setProperty("samza.container.name", "samza-container-1");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions(); // setupSystem() called inside here.
log.addAppender(systemProducerAppender);
Assert.assertEquals("", MockSystemAdmin.createdStreamName);
}
@Test
public void testStreamCreationUpSetupWhenEnabled() {
System.setProperty("samza.container.name", "samza-container-1");
MapConfig mapConfig = new MapConfig(ImmutableMap.of(
"task.log4j.create.stream.enabled", "true", // Enable explicit stream creation
"job.name", "log4jTest",
"job.id", "1",
"systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName(),
"task.log4j.system", "mock"));
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(mapConfig);
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions(); // setupSystem() called inside here.
log.addAppender(systemProducerAppender);
Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamName);
}
@Test
public void testDefaultPartitionCount() {
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
Map<String, String> map = new HashMap<>();
map.put("job.name", "log4jTest");
map.put("job.id", "1");
map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
map.put("task.log4j.system", "mock");
map.put("job.container.count", "4");
systemProducerAppender = new MockSystemProducerAppender(new MapConfig(map));
Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
systemProducerAppender = new MockSystemProducerAppender();
systemProducerAppender.setPartitionCount(8);
Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
}
@Test
public void testExceptionsDoNotKillTransferThread() throws InterruptedException {
System.setProperty("samza.container.name", "samza-container-1");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions();
log.addAppender(systemProducerAppender);
List<String> messages = Lists.newArrayList("testing5", "testing6", "testing7");
// Set up latch
final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
MockSystemProducer.listeners.add((source, envelope) -> {
allMessagesSent.countDown();
if (allMessagesSent.getCount() == messages.size() - 1) {
throw new RuntimeException(); // Throw on the first message
}
});
// Log the messages
messages.forEach((message) -> log.info(message));
// Wait for messages
assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(),
allMessagesSent.await(60, TimeUnit.SECONDS));
}
@Test
public void testQueueTimeout() throws InterruptedException {
System.setProperty("samza.container.name", "samza-container-1");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
systemProducerAppender.queueTimeoutS = 1;
PatternLayout layout = new PatternLayout();
layout.setConversionPattern("%m");
systemProducerAppender.setLayout(layout);
systemProducerAppender.activateOptions();
log.addAppender(systemProducerAppender);
int extraMessageCount = 5;
int expectedMessagesSent = extraMessageCount - 1; // -1 because when the queue is drained there is one additional message that couldn't be added
List<String> messages = new ArrayList<>(StreamAppender.DEFAULT_QUEUE_SIZE + extraMessageCount);
for (int i = 0; i < StreamAppender.DEFAULT_QUEUE_SIZE + extraMessageCount; i++) {
messages.add(String.valueOf(i));
}
// Set up latch
final CountDownLatch allMessagesSent = new CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra messages
final CountDownLatch waitForTimeout = new CountDownLatch(1);
MockSystemProducer.listeners.add((source, envelope) -> {
allMessagesSent.countDown();
try {
waitForTimeout.await();
} catch (InterruptedException e) {
fail("Test could not run properly because of a thread interrupt.");
}
});
// Log the messages. This is where the timeout will happen!
messages.forEach((message) -> log.info(message));
assertEquals(messages.size() - expectedMessagesSent, systemProducerAppender.metrics.logMessagesDropped.getCount());
// Allow all the rest of the messages to send.
waitForTimeout.countDown();
// Wait for messages
assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(),
allMessagesSent.await(60, TimeUnit.SECONDS));
assertEquals(expectedMessagesSent, MockSystemProducer.messagesReceived.size());
}
private void logAndVerifyMessages(List<String> messages) throws InterruptedException {
// Set up latch
final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
// Log the messages
messages.forEach((message) -> log.info(message));
// Wait for messages
assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(),
allMessagesSent.await(60, TimeUnit.SECONDS));
// Verify
assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
for (int i = 0; i < messages.size(); i++) {
assertTrue("Message mismatch at index " + i,
new String((byte[]) MockSystemProducer.messagesReceived.get(i)).contains(asJsonMessageSegment(messages.get(i))));
}
}
private String asJsonMessageSegment(String message) {
return String.format("\"message\":\"%s\"", message);
}
/**
* a mock class which overrides the getConfig method in SystemProducerAppener
* for testing purpose. Because the environment variable where the config
* stays is difficult to test.
*/
class MockSystemProducerAppender extends StreamAppender {
private final Config config;
public MockSystemProducerAppender() {
Map<String, String> map = new HashMap<String, String>();
map.put("job.name", "log4jTest");
map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
map.put("task.log4j.system", "mock");
config = new MapConfig(map);
}
public MockSystemProducerAppender(Config config) {
this.config = config;
}
@Override
protected Config getConfig() {
return config;
}
}
}