blob: 8b6164f1f3cf5981ba1fd565757e01e6b3220047 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.logging;
import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.DEFAULT_RUNNER_LOGGING_LOCATION;
import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.DEFAULT_SDK_LOGGING_LOCATION;
import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY;
import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.SDK_FILEPATH_PROPERTY;
import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.getLoggingHandler;
import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.getSdkLoggingHandler;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.LoggerFactory;
/**
* Unit tests for {@link DataflowWorkerLoggingInitializer}.
*
* <p>Tests which validate written log messages should assume that other background tasks may
* concurrently be writing log messages, since registered log handlers are global. Therefore it is
* not safe to assert on log counts or whether the retrieved log collection is empty.
*/
@RunWith(JUnit4.class)
public class DataflowWorkerLoggingInitializerTest {
@Rule public TemporaryFolder logFolder = new TemporaryFolder();
@Rule public RestoreSystemProperties restoreProperties = new RestoreSystemProperties();
// Should match {@link DataflowWorkerLoggingInitializer#FILEPATH_PROPERTY}
private static final String LOGPATH_PROPERTY = "dataflow.worker.logging.filepath";
@Before
public void setUp() {
Path logFileBasePath = Paths.get(logFolder.getRoot().getAbsolutePath(), "logfile.txt");
System.setProperty(LOGPATH_PROPERTY, logFileBasePath.toString());
LogManager.getLogManager().reset();
DataflowWorkerLoggingInitializer.reset();
DataflowWorkerLoggingInitializer.initialize();
}
@After
public void tearDown() {
LogManager.getLogManager().reset();
DataflowWorkerLoggingInitializer.reset();
}
@Test
public void testWithDefaults() {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
DataflowWorkerLoggingInitializer.configure(options);
Logger rootLogger = LogManager.getLogManager().getLogger("");
assertEquals(1, rootLogger.getHandlers().length);
assertEquals(Level.INFO, rootLogger.getLevel());
assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0], Level.ALL);
}
@Test
public void testWithConfigurationOverride() {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.WARN);
DataflowWorkerLoggingInitializer.configure(options);
Logger rootLogger = LogManager.getLogManager().getLogger("");
assertEquals(1, rootLogger.getHandlers().length);
assertEquals(Level.WARNING, rootLogger.getLevel());
assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0], Level.ALL);
}
@Test
public void testWithCustomLogLevels() {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setWorkerLogLevelOverrides(
new WorkerLogLevelOverrides()
.addOverrideForName("A", DataflowWorkerLoggingOptions.Level.DEBUG)
.addOverrideForName("B", DataflowWorkerLoggingOptions.Level.ERROR));
DataflowWorkerLoggingInitializer.configure(options);
Logger aLogger = LogManager.getLogManager().getLogger("A");
assertEquals(0, aLogger.getHandlers().length);
assertEquals(Level.FINE, aLogger.getLevel());
assertTrue(aLogger.getUseParentHandlers());
Logger bLogger = LogManager.getLogManager().getLogger("B");
assertEquals(Level.SEVERE, bLogger.getLevel());
assertEquals(0, bLogger.getHandlers().length);
assertTrue(aLogger.getUseParentHandlers());
}
private void assertIsDataflowWorkerLoggingHandler(Handler handler, Level level) {
assertThat(handler, instanceOf(DataflowWorkerLoggingHandler.class));
assertEquals(level, handler.getLevel());
}
@Test
public void testStrictGlobalFilterAndRelaxedOverride() throws IOException {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.ERROR);
options.setWorkerLogLevelOverrides(
new WorkerLogLevelOverrides()
.addOverrideForName("A", DataflowWorkerLoggingOptions.Level.INFO));
DataflowWorkerLoggingInitializer.configure(options);
LogManager.getLogManager().getLogger("A").info("foobar");
verifyLogOutput("foobar");
}
@Test
public void testSystemOutToLogger() throws Throwable {
System.out.println("afterInitialization");
verifyLogOutput("afterInitialization");
}
@Test
public void testSystemErrToLogger() throws Throwable {
System.err.println("afterInitialization");
verifyLogOutput("afterInitialization");
}
@Test
public void testSystemOutRespectsFilterConfig() throws IOException {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.ERROR);
DataflowWorkerLoggingInitializer.configure(options);
System.out.println("sys.out");
System.err.println("sys.err");
List<String> actualLines = retrieveLogLines();
assertThat(actualLines, not(hasItem(containsString("sys.out"))));
assertThat(actualLines, hasItem(containsString("sys.err")));
}
@Test
public void testSystemOutLevelOverrides() throws IOException {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setWorkerSystemOutMessageLevel(DataflowWorkerLoggingOptions.Level.WARN);
DataflowWorkerLoggingInitializer.configure(options.as(DataflowWorkerLoggingOptions.class));
System.out.println("foobar");
verifyLogOutput("WARN");
}
@Test
public void testSystemOutCustomLogLevel() throws IOException {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setWorkerLogLevelOverrides(
new WorkerLogLevelOverrides()
.addOverrideForName("System.out", DataflowWorkerLoggingOptions.Level.ERROR));
DataflowWorkerLoggingInitializer.configure(options);
System.out.println("sys.out");
List<String> actualLines = retrieveLogLines();
// N.B.: It's not safe to assert that actualLines is "empty" since the logging framework is
// global and logs may be concurrently written by other infrastructure.
assertThat(actualLines, not(hasItem(containsString("sys.out"))));
}
@Test
public void testSystemErrLevelOverrides() throws IOException {
DataflowWorkerLoggingOptions options =
PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
options.setWorkerSystemErrMessageLevel(DataflowWorkerLoggingOptions.Level.WARN);
DataflowWorkerLoggingInitializer.configure(options.as(DataflowWorkerLoggingOptions.class));
System.err.println("foobar");
verifyLogOutput("WARN");
}
/**
* Verify we can handle additional user logging configuration. Specifically, ensure that we
* gracefully handle adding an additional log handler which forwards to stdout.
*/
@Test
public void testUserHandlerForwardsStdOut() throws Throwable {
registerRootLogHandler(new StdOutLogHandler());
org.slf4j.Logger log = LoggerFactory.getLogger(DataflowWorkerLoggingInitializerTest.class);
log.info("foobar");
verifyLogOutput("foobar");
}
@Test
public void testLoggingHandlersAreDifferent() {
assertThat(getLoggingHandler(), not(getSdkLoggingHandler()));
assertThat(DEFAULT_RUNNER_LOGGING_LOCATION, not(DEFAULT_SDK_LOGGING_LOCATION));
assertThat(RUNNER_FILEPATH_PROPERTY, not(SDK_FILEPATH_PROPERTY));
}
static class StdOutLogHandler extends java.util.logging.Handler {
@Override
public void publish(LogRecord record) {
System.out.println(record.getMessage());
}
@Override
public void flush() {}
@Override
public void close() {}
}
private void registerRootLogHandler(Handler handler) {
Logger rootLogger = LogManager.getLogManager().getLogger("");
rootLogger.addHandler(handler);
}
private void verifyLogOutput(String substring) throws IOException {
List<String> logLines = retrieveLogLines();
assertThat(logLines, hasItem(containsString(substring)));
}
private List<String> retrieveLogLines() throws IOException {
List<String> allLogLines = Lists.newArrayList();
for (File logFile : logFolder.getRoot().listFiles()) {
allLogLines.addAll(Files.readAllLines(logFile.toPath(), StandardCharsets.UTF_8));
}
return allLogLines;
}
}