| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.history.logging.proto; |
| |
| import java.io.IOException; |
| import java.time.LocalDate; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.HistoryEvent; |
| import org.apache.tez.dag.history.HistoryEventType; |
| import org.apache.tez.dag.history.events.DAGFinishedEvent; |
| import org.apache.tez.dag.history.logging.HistoryLoggingService; |
| import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto; |
| import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Logging service to write history events serialized using protobuf into sequence files. |
| * This can be used as external tables in hive. Or the reader can be used independently to |
| * read the data from these files. |
| */ |
| public class ProtoHistoryLoggingService extends HistoryLoggingService { |
| private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryLoggingService.class); |
| // The file suffix used if we are writing start events and rest into different files. |
| static final String SPLIT_DAG_EVENTS_FILE_SUFFIX = "_1"; |
| |
| private final HistoryEventProtoConverter converter = |
| new HistoryEventProtoConverter(); |
| private boolean loggingDisabled = false; |
| |
| private LinkedBlockingQueue<DAGHistoryEvent> eventQueue; |
| private Thread eventHandlingThread; |
| private final AtomicBoolean stopped = new AtomicBoolean(false); |
| |
| private TezProtoLoggers loggers; |
| private ProtoMessageWriter<HistoryEventProto> appEventsWriter; |
| private ProtoMessageWriter<HistoryEventProto> dagEventsWriter; |
| private ProtoMessageWriter<ManifestEntryProto> manifestEventsWriter; |
| private LocalDate manifestDate; |
| private TezDAGID currentDagId; |
| private long dagSubmittedEventOffset = -1; |
| |
| private String appEventsFile; |
| private long appLaunchedEventOffset; |
| private boolean splitDagStartEvents; |
| |
| public ProtoHistoryLoggingService() { |
| super(ProtoHistoryLoggingService.class.getName()); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| LOG.info("Initing ProtoHistoryLoggingService"); |
| setConfig(conf); |
| loggingDisabled = !conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, |
| TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); |
| splitDagStartEvents = conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START, |
| TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT); |
| final int queueSize = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE, |
| TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT); |
| eventQueue = new LinkedBlockingQueue<>(queueSize); |
| LOG.info("Inited ProtoHistoryLoggingService. loggingDisabled: {} splitDagStartEvents: {} queueSize: {}", |
| loggingDisabled, splitDagStartEvents, queueSize); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| LOG.info("Starting ProtoHistoryLoggingService"); |
| if (!loggingDisabled) { |
| loggers = new TezProtoLoggers(); |
| if (!loggers.setup(getConfig(), appContext.getClock())) { |
| LOG.warn("Log file location for ProtoHistoryLoggingService not specified, " + |
| "logging disabled"); |
| loggingDisabled = true; |
| return; |
| } |
| appEventsWriter = loggers.getAppEventsLogger().getWriter( |
| appContext.getApplicationAttemptId().toString()); |
| eventHandlingThread = new Thread(this::loop, "HistoryEventHandlingThread"); |
| eventHandlingThread.start(); |
| } |
| LOG.info("Started ProtoHistoryLoggingService"); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| LOG.info("Stopping ProtoHistoryLoggingService, eventQueueBacklog=" + eventQueue.size()); |
| stopped.set(true); |
| eventHandlingThread.join(); |
| IOUtils.closeQuietly(appEventsWriter); |
| IOUtils.closeQuietly(dagEventsWriter); |
| IOUtils.closeQuietly(manifestEventsWriter); |
| LOG.info("Stopped ProtoHistoryLoggingService"); |
| } |
| |
| @Override |
| public void handle(DAGHistoryEvent event) { |
| if (loggingDisabled || stopped.get()) { |
| return; |
| } |
| try { |
| eventQueue.add(event); |
| } catch (IllegalStateException e) { |
| LOG.error("Queue capacity filled up, ignoring event: " + |
| event.getHistoryEvent().getEventType()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Queue capacity filled up, ignoring event: {}", event.getHistoryEvent()); |
| } |
| } |
| } |
| |
| private void loop() { |
| // Keep looping while the service is not stopped. |
| // Drain any left over events after the service has been stopped. |
| while (!stopped.get() || !eventQueue.isEmpty()) { |
| DAGHistoryEvent evt = null; |
| try { |
| evt = eventQueue.poll(100, TimeUnit.MILLISECONDS); |
| if (evt != null) { |
| handleEvent(evt); |
| } |
| } catch (InterruptedException e) { |
| LOG.info("EventQueue poll interrupted, ignoring it.", e); |
| } catch (IOException e) { |
| TezDAGID dagid = evt.getDagID(); |
| HistoryEventType type = evt.getHistoryEvent().getEventType(); |
| // Retry is hard, because there are several places where this exception can happen |
| // the state will get messed up a lot. |
| LOG.error("Got exception while handling event {} for dag {}.", type, dagid, e); |
| } |
| } |
| } |
| |
| private void handleEvent(DAGHistoryEvent event) throws IOException { |
| if (loggingDisabled) { |
| return; |
| } |
| HistoryEvent historyEvent = event.getHistoryEvent(); |
| if (event.getDagID() == null) { |
| if (historyEvent.getEventType() == HistoryEventType.APP_LAUNCHED) { |
| appEventsFile = appEventsWriter.getPath().toString(); |
| appLaunchedEventOffset = appEventsWriter.getOffset(); |
| } |
| appEventsWriter.writeProto(converter.convert(historyEvent)); |
| } else { |
| HistoryEventType type = historyEvent.getEventType(); |
| TezDAGID dagId = event.getDagID(); |
| if (type == HistoryEventType.DAG_FINISHED) { |
| finishCurrentDag((DAGFinishedEvent)historyEvent); |
| } else if (type == HistoryEventType.DAG_SUBMITTED) { |
| finishCurrentDag(null); |
| currentDagId = dagId; |
| dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString() |
| + "_" + appContext.getApplicationAttemptId().getAttemptId()); |
| dagSubmittedEventOffset = dagEventsWriter.getOffset(); |
| dagEventsWriter.writeProto(converter.convert(historyEvent)); |
| } else if (dagEventsWriter != null) { |
| dagEventsWriter.writeProto(converter.convert(historyEvent)); |
| if (splitDagStartEvents && type == HistoryEventType.DAG_STARTED) { |
| // Close the file and write submitted event offset into manifest. |
| finishCurrentDag(null); |
| dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString() |
| + "_" + appContext.getApplicationAttemptId().getAttemptId() |
| + SPLIT_DAG_EVENTS_FILE_SUFFIX); |
| } |
| } |
| } |
| } |
| |
| private void finishCurrentDag(DAGFinishedEvent event) throws IOException { |
| if (dagEventsWriter == null) { |
| return; |
| } |
| try { |
| long finishEventOffset = -1; |
| if (event != null) { |
| finishEventOffset = dagEventsWriter.getOffset(); |
| dagEventsWriter.writeProto(converter.convert(event)); |
| } |
| DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger(); |
| if (manifestDate == null || !manifestDate.equals(manifestLogger.getNow().toLocalDate())) { |
| // The day has changed write to a new file. |
| IOUtils.closeQuietly(manifestEventsWriter); |
| manifestEventsWriter = manifestLogger.getWriter( |
| appContext.getApplicationAttemptId().toString()); |
| manifestDate = manifestLogger.getDateFromDir( |
| manifestEventsWriter.getPath().getParent().getName()); |
| } |
| ManifestEntryProto.Builder entry = ManifestEntryProto.newBuilder() |
| .setDagId(currentDagId.toString()) |
| .setAppId(currentDagId.getApplicationId().toString()) |
| .setDagSubmittedEventOffset(dagSubmittedEventOffset) |
| .setDagFinishedEventOffset(finishEventOffset) |
| .setDagFilePath(dagEventsWriter.getPath().toString()) |
| .setAppFilePath(appEventsFile) |
| .setAppLaunchedEventOffset(appLaunchedEventOffset) |
| .setWriteTime(System.currentTimeMillis()); |
| if (event != null) { |
| entry.setDagId(event.getDagID().toString()); |
| } |
| manifestEventsWriter.writeProto(entry.build()); |
| manifestEventsWriter.hflush(); |
| appEventsWriter.hflush(); |
| } finally { |
| // On an error, cleanup everything this will ensure, we do not use one dag's writer |
| // into another dag. |
| IOUtils.closeQuietly(dagEventsWriter); |
| dagEventsWriter = null; |
| dagSubmittedEventOffset = -1; |
| } |
| } |
| } |