| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker.logging; |
| |
| import static org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.LEVELS; |
| |
| import com.fasterxml.jackson.core.JsonEncoding; |
| import com.fasterxml.jackson.core.JsonFactory; |
| import com.fasterxml.jackson.core.JsonGenerator; |
| import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.text.SimpleDateFormat; |
| import java.util.Date; |
| import java.util.EnumMap; |
| import java.util.logging.ErrorManager; |
| import java.util.logging.Formatter; |
| import java.util.logging.Handler; |
| import java.util.logging.LogRecord; |
| import java.util.logging.SimpleFormatter; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi; |
| import org.apache.beam.runners.core.metrics.ExecutionStateTracker; |
| import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; |
| import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; |
| import org.apache.beam.runners.dataflow.worker.counters.NameContext; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.io.CountingOutputStream; |
| |
| /** |
| * Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception is represented using |
| * {@link Throwable#printStackTrace()}. |
| */ |
| public class DataflowWorkerLoggingHandler extends Handler { |
| private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String> |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL; |
| |
| static { |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class); |
| // Note that Google Cloud Logging only defines a fixed number of severities and maps "TRACE" |
| // onto "DEBUG" as seen here: https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud |
| // /blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865 |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE, "DEBUG"); |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG, "DEBUG"); |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO, "INFO"); |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE, "NOTICE"); |
| // Note that Google Cloud Logging only defines a fixed number of severities and maps "WARN" onto |
| // "WARNING" as seen here: https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud |
| // /blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865 |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN, "WARNING"); |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR, "ERROR"); |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL, "CRITICAL"); |
| } |
| |
| /** |
| * Formats the throwable as per {@link Throwable#printStackTrace()}. |
| * |
| * @param thrown The throwable to format. |
| * @return A string containing the contents of {@link Throwable#printStackTrace()}. |
| */ |
| public static String formatException(Throwable thrown) { |
| if (thrown == null) { |
| return null; |
| } |
| StringWriter sw = new StringWriter(); |
| try (PrintWriter pw = new PrintWriter(sw)) { |
| thrown.printStackTrace(pw); |
| } |
| return sw.toString(); |
| } |
| |
| /** Constructs a handler that writes to a rotating set of files. */ |
| public DataflowWorkerLoggingHandler(String filename, long sizeLimit) throws IOException { |
| this(new FileOutputStreamFactory(filename), sizeLimit); |
| } |
| |
| /** |
| * Constructs a handler that writes to arbitrary output streams. No rollover if sizeLimit is zero |
| * or negative. |
| */ |
| DataflowWorkerLoggingHandler(Supplier<OutputStream> factory, long sizeLimit) throws IOException { |
| this.outputStreamFactory = factory; |
| this.generatorFactory = new ObjectMapper().getFactory(); |
| this.sizeLimit = sizeLimit < 1 ? Long.MAX_VALUE : sizeLimit; |
| createOutputStream(); |
| } |
| |
| @Override |
| public synchronized void publish(LogRecord record) { |
| DataflowExecutionState currrentDataflowState = null; |
| ExecutionState currrentState = ExecutionStateTracker.getCurrentExecutionState(); |
| if (currrentState instanceof DataflowExecutionState) { |
| currrentDataflowState = (DataflowExecutionState) currrentState; |
| } |
| // It's okay to pass in the null state, publish() handles and tests this. |
| publish(currrentDataflowState, record); |
| } |
| |
| public synchronized void publish(DataflowExecutionState currentExecutionState, LogRecord record) { |
| if (!isLoggable(record)) { |
| return; |
| } |
| |
| rolloverOutputStreamIfNeeded(); |
| |
| try { |
| // Generating a JSON map like: |
| // {"timestamp": {"seconds": 1435835832, "nanos": 123456789}, ... "message": "hello"} |
| generator.writeStartObject(); |
| // Write the timestamp. |
| generator.writeFieldName("timestamp"); |
| generator.writeStartObject(); |
| generator.writeNumberField("seconds", record.getMillis() / 1000); |
| generator.writeNumberField("nanos", (record.getMillis() % 1000) * 1000000); |
| generator.writeEndObject(); |
| // Write the severity. |
| generator.writeObjectField( |
| "severity", |
| MoreObjects.firstNonNull(LEVELS.get(record.getLevel()), record.getLevel().getName())); |
| // Write the other labels. |
| writeIfNotEmpty("message", formatter.formatMessage(record)); |
| writeIfNotEmpty("thread", String.valueOf(record.getThreadID())); |
| writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId()); |
| writeIfNotEmpty("stage", DataflowWorkerLoggingMDC.getStageName()); |
| |
| if (currentExecutionState != null) { |
| NameContext nameContext = currentExecutionState.getStepName(); |
| if (nameContext != null) { |
| writeIfNotEmpty("step", nameContext.userName()); |
| } |
| } |
| writeIfNotEmpty("worker", DataflowWorkerLoggingMDC.getWorkerId()); |
| writeIfNotEmpty("work", DataflowWorkerLoggingMDC.getWorkId()); |
| writeIfNotEmpty("logger", record.getLoggerName()); |
| writeIfNotEmpty("exception", formatException(record.getThrown())); |
| generator.writeEndObject(); |
| generator.writeRaw(System.lineSeparator()); |
| } catch (IOException | RuntimeException e) { |
| reportFailure("Unable to publish", e, ErrorManager.WRITE_FAILURE); |
| } |
| |
| // This implementation is based on that of java.util.logging.FileHandler, which flushes in a |
| // synchronized context like this. Unfortunately the maximum throughput for generating log |
| // entries will be the inverse of the flush latency. That could be as little as one hundred |
| // log entries per second on some systems. For higher throughput this should be changed to |
| // batch publish operations while writes and flushes are in flight on a different thread. |
| flush(); |
| } |
| |
| public synchronized void publish(BeamFnApi.LogEntry logEntry) { |
| if (generator == null || logEntry == null) { |
| return; |
| } |
| |
| rolloverOutputStreamIfNeeded(); |
| |
| try { |
| // Generating a JSON map like: |
| // {"timestamp": {"seconds": 1435835832, "nanos": 123456789}, ... "message": "hello"} |
| generator.writeStartObject(); |
| // Write the timestamp. |
| generator.writeFieldName("timestamp"); |
| generator.writeStartObject(); |
| generator.writeNumberField("seconds", logEntry.getTimestamp().getSeconds()); |
| generator.writeNumberField("nanos", logEntry.getTimestamp().getNanos()); |
| generator.writeEndObject(); |
| // Write the severity. |
| generator.writeObjectField( |
| "severity", |
| MoreObjects.firstNonNull( |
| BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.get(logEntry.getSeverity()), |
| logEntry.getSeverity().getValueDescriptor().getName())); |
| // Write the other labels. |
| writeIfNotEmpty("message", logEntry.getMessage()); |
| writeIfNotEmpty("thread", logEntry.getThread()); |
| writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId()); |
| // TODO: Write the stage execution information by translating the currently execution |
| // instruction reference to a stage. |
| // writeIfNotNull("stage", ...); |
| writeIfNotEmpty("step", logEntry.getPrimitiveTransformReference()); |
| writeIfNotEmpty("worker", DataflowWorkerLoggingMDC.getWorkerId()); |
| // Id should match to id in //depot/google3/third_party/cloud/dataflow/worker/agent/sdk.go |
| writeIfNotEmpty("portability_worker_id", DataflowWorkerLoggingMDC.getSdkHarnessId()); |
| writeIfNotEmpty("work", logEntry.getInstructionReference()); |
| writeIfNotEmpty("logger", logEntry.getLogLocation()); |
| // TODO: Figure out a way to get exceptions transported across Beam Fn Logging API |
| writeIfNotEmpty("exception", logEntry.getTrace()); |
| generator.writeEndObject(); |
| generator.writeRaw(System.lineSeparator()); |
| } catch (IOException | RuntimeException e) { |
| reportFailure("Unable to publish", e, ErrorManager.WRITE_FAILURE); |
| } |
| |
| // This implementation is based on that of java.util.logging.FileHandler, which flushes in a |
| // synchronized context like this. Unfortunately the maximum throughput for generating log |
| // entries will be the inverse of the flush latency. That could be as little as one hundred |
| // log entries per second on some systems. For higher throughput this should be changed to |
| // batch publish operations while writes and flushes are in flight on a different thread. |
| flush(); |
| } |
| |
| /** |
| * Check if a LogRecord will be logged. |
| * |
| * <p>This method checks if the <tt>LogRecord</tt> has an appropriate level and whether it |
| * satisfies any <tt>Filter</tt>. It will also return false if the handler has been closed, or the |
| * LogRecord is null. |
| */ |
| @Override |
| public boolean isLoggable(LogRecord record) { |
| return generator != null && record != null && super.isLoggable(record); |
| } |
| |
| @Override |
| public synchronized void flush() { |
| try { |
| if (generator != null) { |
| generator.flush(); |
| } |
| } catch (IOException | RuntimeException e) { |
| reportFailure("Unable to flush", e, ErrorManager.FLUSH_FAILURE); |
| } |
| } |
| |
| @Override |
| public synchronized void close() { |
| // Flush any in-flight content, though there should not actually be any because |
| // the generator is currently flushed in the synchronized publish() method. |
| flush(); |
| // Close the generator and log file. |
| try { |
| if (generator != null) { |
| generator.close(); |
| } |
| } catch (IOException | RuntimeException e) { |
| reportFailure("Unable to close", e, ErrorManager.CLOSE_FAILURE); |
| } finally { |
| generator = null; |
| counter = null; |
| } |
| } |
| |
| /** Unique file generator. Uses filenames with timestamp. */ |
| private static final class FileOutputStreamFactory implements Supplier<OutputStream> { |
| private final String filepath; |
| private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss_SSS"); |
| |
| public FileOutputStreamFactory(String filepath) { |
| this.filepath = filepath; |
| } |
| |
| @Override |
| public OutputStream get() { |
| try { |
| String filename = filepath + "." + formatter.format(new Date()) + ".log"; |
| return new BufferedOutputStream( |
| new FileOutputStream(new File(filename), true /* append */)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private void createOutputStream() throws IOException { |
| CountingOutputStream stream = new CountingOutputStream(outputStreamFactory.get()); |
| generator = generatorFactory.createGenerator(stream, JsonEncoding.UTF8); |
| counter = stream; |
| |
| // Avoid 1 space indent for every line. We already add a newline after each log record. |
| generator.setPrettyPrinter(new MinimalPrettyPrinter("")); |
| } |
| |
| /** |
| * Rollover to a new output stream (log file) if we have reached the size limit. Ensure that the |
| * rollover fails or succeeds atomically. |
| */ |
| private void rolloverOutputStreamIfNeeded() { |
| if (counter.getCount() < sizeLimit) { |
| return; |
| } |
| |
| try { |
| JsonGenerator old = generator; |
| createOutputStream(); |
| |
| try { |
| // Rollover successful. Attempt to close old stream, but ignore on failure. |
| old.close(); |
| } catch (IOException | RuntimeException e) { |
| reportFailure("Unable to close old log file", e, ErrorManager.CLOSE_FAILURE); |
| } |
| } catch (IOException | RuntimeException e) { |
| reportFailure("Unable to create new log file", e, ErrorManager.OPEN_FAILURE); |
| } |
| } |
| |
| /** Appends a JSON key/value pair if the specified val is not null. */ |
| private void writeIfNotEmpty(String name, String val) throws IOException { |
| if (val != null && !val.isEmpty()) { |
| generator.writeStringField(name, val); |
| } |
| } |
| |
| /** Report logging failure to ErrorManager. Does not throw. */ |
| private void reportFailure(String message, Exception e, int code) { |
| try { |
| ErrorManager manager = getErrorManager(); |
| if (manager != null) { |
| manager.error(message, e, code); |
| } |
| } catch (Throwable t) { |
| // Failed to report logging failure. No meaningful action left. |
| } |
| } |
| |
| // Null after close(). |
| private JsonGenerator generator; |
| private CountingOutputStream counter; |
| |
| private final long sizeLimit; |
| private final Supplier<OutputStream> outputStreamFactory; |
| private final JsonFactory generatorFactory; |
| private final Formatter formatter = new SimpleFormatter(); |
| } |