blob: 56688470dc793e34961c7175971784ccb344352d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.logging;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.DEBUG;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.ERROR;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.INFO;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.OFF;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.TRACE;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.WARN;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
/**
* Sets up {@link java.util.logging} configuration on the Dataflow worker with a rotating file
* logger. The file logger uses the {@link DataflowWorkerLoggingHandler} format. A user can override
* the logging level by customizing the options found within {@link DataflowWorkerLoggingOptions}. A
* user can override the location by specifying the Java system property
* "dataflow.worker.logging.basepath" and the file size in MB before rolling over to a new file by
* specifying the Java system property "dataflow.worker. loggging.filesize_mb". The default log
* level is INFO, the default location is a file named dataflow-json.log within the system temporary
* directory and the default file size is 1 GB.
*/
public class DataflowWorkerLoggingInitializer {
private static final String ROOT_LOGGER_NAME = "";
@VisibleForTesting
static final String DEFAULT_RUNNER_LOGGING_LOCATION =
new File(System.getProperty("java.io.tmpdir"), "dataflow-json.log").getPath();
@VisibleForTesting
static final String DEFAULT_SDK_LOGGING_LOCATION =
new File(System.getProperty("java.io.tmpdir"), "sdk-json.log").getPath();
@VisibleForTesting
public static final String RUNNER_FILEPATH_PROPERTY = "dataflow.worker.logging.filepath";
@VisibleForTesting
static final String SDK_FILEPATH_PROPERTY = "dataflow.worker.logging.sdkfilepath";
private static final String FILESIZE_MB_PROPERTY = "dataflow.worker.logging.filesize_mb";
private static final String SYSTEM_OUT_LOG_NAME = "System.out";
private static final String SYSTEM_ERR_LOG_NAME = "System.err";
static final ImmutableBiMap<Level, DataflowWorkerLoggingOptions.Level> LEVELS =
ImmutableBiMap.<Level, DataflowWorkerLoggingOptions.Level>builder()
.put(Level.OFF, OFF)
.put(Level.SEVERE, ERROR)
.put(Level.WARNING, WARN)
.put(Level.INFO, INFO)
.put(Level.FINE, DEBUG)
.put(Level.FINEST, TRACE)
.build();
/**
* This default log level is overridden by the log level found at {@code
* DataflowWorkerLoggingOptions#getDefaultWorkerLogLevel()}.
*/
private static final DataflowWorkerLoggingOptions.Level DEFAULT_LOG_LEVEL =
LEVELS.get(Level.INFO);
/* We need to store a reference to the configured loggers so that they are not
* garbage collected. java.util.logging only has weak references to the loggers
* so if they are garbage collection, our hierarchical configuration will be lost. */
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private static List<Logger> configuredLoggers = Lists.newArrayList();
private static DataflowWorkerLoggingHandler loggingHandler;
private static DataflowWorkerLoggingHandler sdkLoggingHandler;
private static PrintStream originalStdOut;
private static PrintStream originalStdErr = System.err;
private static boolean initialized = false;
private static DataflowWorkerLoggingHandler makeLoggingHandler(
String filepathProperty, String defaultFilePath) throws IOException {
String filepath = System.getProperty(filepathProperty, defaultFilePath);
int filesizeMb = Integer.parseInt(System.getProperty(FILESIZE_MB_PROPERTY, "1024"));
DataflowWorkerLoggingHandler handler =
new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L);
handler.setLevel(Level.ALL);
return handler;
}
public static PrintStream getOriginalStdErr() {
return originalStdErr;
}
/** Sets up the initial logging configuration. */
public static synchronized void initialize() {
if (initialized) {
return;
}
try {
loggingHandler =
makeLoggingHandler(RUNNER_FILEPATH_PROPERTY, DEFAULT_RUNNER_LOGGING_LOCATION);
Logger rootLogger = LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME);
Level logLevel = getJulLevel(DEFAULT_LOG_LEVEL);
rootLogger.setLevel(logLevel);
rootLogger.addHandler(loggingHandler);
originalStdOut = System.out;
originalStdErr = System.err;
System.setOut(
JulHandlerPrintStreamAdapterFactory.create(
loggingHandler, SYSTEM_OUT_LOG_NAME, Level.INFO));
System.setErr(
JulHandlerPrintStreamAdapterFactory.create(
loggingHandler, SYSTEM_ERR_LOG_NAME, Level.SEVERE));
// Initialize the SDK Logging Handler, which will only be used for the LoggingService
sdkLoggingHandler = makeLoggingHandler(SDK_FILEPATH_PROPERTY, DEFAULT_SDK_LOGGING_LOCATION);
initialized = true;
} catch (SecurityException | IOException | NumberFormatException e) {
throw new ExceptionInInitializerError(e);
}
}
/** Reconfigures logging with the passed in options. */
public static synchronized void configure(DataflowWorkerLoggingOptions options) {
if (!initialized) {
throw new RuntimeException("configure() called before initialize()");
}
if (options.getDefaultWorkerLogLevel() != null) {
Level defaultLevel = getJulLevel(options.getDefaultWorkerLogLevel());
LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).setLevel(defaultLevel);
}
if (options.getWorkerLogLevelOverrides() != null) {
for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride :
options.getWorkerLogLevelOverrides().entrySet()) {
Logger logger = Logger.getLogger(loggerOverride.getKey());
logger.setLevel(getJulLevel(loggerOverride.getValue()));
configuredLoggers.add(logger);
}
}
// If the options specify a level for messages logged to System.out/err, we need to reconfigure
// the corresponding stream adapter.
if (options.getWorkerSystemOutMessageLevel() != null) {
System.out.close();
System.setOut(
JulHandlerPrintStreamAdapterFactory.create(
loggingHandler,
SYSTEM_OUT_LOG_NAME,
getJulLevel(options.getWorkerSystemOutMessageLevel())));
}
if (options.getWorkerSystemErrMessageLevel() != null) {
System.err.close();
System.setErr(
JulHandlerPrintStreamAdapterFactory.create(
loggingHandler,
SYSTEM_ERR_LOG_NAME,
getJulLevel(options.getWorkerSystemErrMessageLevel())));
}
}
/**
* Returns the underlying {@link DataflowWorkerLoggingHandler}.
*
* <p>Generally, code should just use logging interface.
*/
public static DataflowWorkerLoggingHandler getLoggingHandler() {
if (!initialized) {
throw new RuntimeException("getLoggingHandler() called before initialize()");
}
return loggingHandler;
}
/**
* Returns the underlying {@link DataflowWorkerLoggingHandler} for logs from the SDK.
*
* <p>Initializes Dataflow worker logging if not initialized already.
*/
public static DataflowWorkerLoggingHandler getSdkLoggingHandler() {
if (!initialized) {
throw new RuntimeException("getSdkLoggingHandler() called before initialize()");
}
return sdkLoggingHandler;
}
private static Level getJulLevel(DataflowWorkerLoggingOptions.Level level) {
return LEVELS.inverse().get(level);
}
@VisibleForTesting
public static synchronized void reset() {
if (!initialized) {
return;
}
configuredLoggers = Lists.newArrayList();
System.setOut(originalStdOut);
System.setErr(originalStdErr);
JulHandlerPrintStreamAdapterFactory.reset();
initialized = false;
}
}