| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.history.logging.proto; |
| |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.time.LocalDate; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.tez.common.VersionInfo; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.dag.DAGState; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.HistoryEventType; |
| import org.apache.tez.dag.history.events.AppLaunchedEvent; |
| import org.apache.tez.dag.history.events.DAGFinishedEvent; |
| import org.apache.tez.dag.history.events.DAGInitializedEvent; |
| import org.apache.tez.dag.history.events.DAGStartedEvent; |
| import org.apache.tez.dag.history.events.DAGSubmittedEvent; |
| import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; |
| import org.apache.tez.dag.history.events.TaskStartedEvent; |
| import org.apache.tez.dag.history.events.VertexStartedEvent; |
| import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto; |
| import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.hadoop.shim.HadoopShim; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| public class TestProtoHistoryLoggingService { |
| private static ApplicationId appId = ApplicationId.newInstance(1000l, 1); |
| private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); |
| private static String user = "TEST_USER"; |
| private Clock clock; |
| |
| @Rule |
| public TemporaryFolder tempFolder = new TemporaryFolder(); |
| |
| @Test |
| public void testService() throws Exception { |
| ProtoHistoryLoggingService service = createService(false); |
| service.start(); |
| TezDAGID dagId = TezDAGID.getInstance(appId, 0); |
| List<HistoryEventProto> protos = new ArrayList<>(); |
| for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) { |
| protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent())); |
| service.handle(event); |
| } |
| service.stop(); |
| |
| TezProtoLoggers loggers = new TezProtoLoggers(); |
| Assert.assertTrue(loggers.setup(service.getConfig(), clock)); |
| |
| // Verify dag events are logged. |
| DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger(); |
| Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1"); |
| ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath); |
| assertEventsRead(reader, protos, 1, protos.size()); |
| reader.close(); |
| |
| // Verify app events are logged. |
| DatePartitionedLogger<HistoryEventProto> appLogger = loggers.getAppEventsLogger(); |
| Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0), attemptId.toString()); |
| ProtoMessageReader<HistoryEventProto> appReader = appLogger.getReader(appFilePath); |
| long appOffset = appReader.getOffset(); |
| Assert.assertEquals(protos.get(0), appReader.readEvent()); |
| appReader.close(); |
| |
| // Verify manifest events are logged. |
| DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger(); |
| Path manifestFilePath = manifestLogger.getPathForDate( |
| LocalDate.ofEpochDay(0), attemptId.toString()); |
| ProtoMessageReader<ManifestEntryProto> reader2 = manifestLogger.getReader(manifestFilePath); |
| ManifestEntryProto manifest = reader2.readEvent(); |
| Assert.assertEquals(appId.toString(), manifest.getAppId()); |
| Assert.assertEquals(dagId.toString(), manifest.getDagId()); |
| Assert.assertEquals(dagFilePath.toString(), manifest.getDagFilePath()); |
| Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath()); |
| Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset()); |
| |
| // Verify offsets in manifest logger. |
| reader = dagLogger.getReader(new Path(manifest.getDagFilePath())); |
| reader.setOffset(manifest.getDagSubmittedEventOffset()); |
| HistoryEventProto evt = reader.readEvent(); |
| Assert.assertNotNull(evt); |
| Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), evt.getEventType()); |
| |
| reader.setOffset(manifest.getDagFinishedEventOffset()); |
| evt = reader.readEvent(); |
| Assert.assertNotNull(evt); |
| Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), evt.getEventType()); |
| reader.close(); |
| |
| // Verify manifest file scanner. |
| DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger); |
| Assert.assertEquals(manifest, scanner.getNext()); |
| Assert.assertNull(scanner.getNext()); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testServiceSplitEvents() throws Exception { |
| ProtoHistoryLoggingService service = createService(true); |
| service.start(); |
| TezDAGID dagId = TezDAGID.getInstance(appId, 0); |
| List<HistoryEventProto> protos = new ArrayList<>(); |
| for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) { |
| protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent())); |
| service.handle(event); |
| } |
| service.stop(); |
| |
| TezProtoLoggers loggers = new TezProtoLoggers(); |
| Assert.assertTrue(loggers.setup(service.getConfig(), clock)); |
| |
| // Verify dag events are logged. |
| DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger(); |
| Path dagFilePath1 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1"); |
| Path dagFilePath2 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1" + |
| ProtoHistoryLoggingService.SPLIT_DAG_EVENTS_FILE_SUFFIX); |
| |
| try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath1)) { |
| assertEventsRead(reader, protos, 1, 1 + 3); |
| } |
| |
| try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath2)) { |
| assertEventsRead(reader, protos, 4, protos.size()); |
| } |
| |
| // Verify app events are logged. |
| DatePartitionedLogger<HistoryEventProto> appLogger = loggers.getAppEventsLogger(); |
| Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0), attemptId.toString()); |
| ProtoMessageReader<HistoryEventProto> appReader = appLogger.getReader(appFilePath); |
| long appOffset = appReader.getOffset(); |
| Assert.assertEquals(protos.get(0), appReader.readEvent()); |
| appReader.close(); |
| |
| // Verify manifest events are logged. |
| DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger(); |
| DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger); |
| Path manifestFilePath = manifestLogger.getPathForDate( |
| LocalDate.ofEpochDay(0), attemptId.toString()); |
| ProtoMessageReader<ManifestEntryProto> manifestReader = manifestLogger.getReader( |
| manifestFilePath); |
| ManifestEntryProto manifest = manifestReader.readEvent(); |
| Assert.assertEquals(manifest, scanner.getNext()); |
| Assert.assertEquals(appId.toString(), manifest.getAppId()); |
| Assert.assertEquals(dagId.toString(), manifest.getDagId()); |
| Assert.assertEquals(dagFilePath1.toString(), manifest.getDagFilePath()); |
| Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath()); |
| Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset()); |
| Assert.assertEquals(-1, manifest.getDagFinishedEventOffset()); |
| |
| HistoryEventProto evt = null; |
| // Verify offsets in manifest logger. |
| try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader( |
| new Path(manifest.getDagFilePath()))) { |
| reader.setOffset(manifest.getDagSubmittedEventOffset()); |
| evt = reader.readEvent(); |
| Assert.assertNotNull(evt); |
| Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), evt.getEventType()); |
| } |
| |
| manifest = manifestReader.readEvent(); |
| Assert.assertEquals(manifest, scanner.getNext()); |
| Assert.assertEquals(appId.toString(), manifest.getAppId()); |
| Assert.assertEquals(dagId.toString(), manifest.getDagId()); |
| Assert.assertEquals(dagFilePath2.toString(), manifest.getDagFilePath()); |
| Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath()); |
| Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset()); |
| Assert.assertEquals(-1, manifest.getDagSubmittedEventOffset()); |
| |
| try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader( |
| new Path(manifest.getDagFilePath()))) { |
| reader.setOffset(manifest.getDagFinishedEventOffset()); |
| evt = reader.readEvent(); |
| Assert.assertNotNull(evt); |
| Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), evt.getEventType()); |
| } |
| |
| // Verify manifest file scanner. |
| Assert.assertNull(scanner.getNext()); |
| scanner.close(); |
| } |
| |
| private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, |
| ProtoHistoryLoggingService service) { |
| List<DAGHistoryEvent> historyEvents = new ArrayList<>(); |
| DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); |
| |
| long time = System.currentTimeMillis(); |
| Configuration conf = new Configuration(service.getConfig()); |
| historyEvents.add(new DAGHistoryEvent(null, new AppLaunchedEvent(appId, time, time, user, conf, |
| new VersionInfo("component", "1.1.0", "rev1", "20120101", "git.apache.org") {}))); |
| historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, |
| DAGPlan.getDefaultInstance(), attemptId, null, user, conf, null, "default"))); |
| historyEvents.add(new DAGHistoryEvent(dagId, new DAGInitializedEvent(dagId, time + 1, user, |
| "test_dag", Collections.emptyMap()))); |
| historyEvents.add(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, time + 2, user, |
| "test_dag"))); |
| |
| TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); |
| historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); |
| TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); |
| historyEvents |
| .add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, "test", time, time))); |
| historyEvents.add(new DAGHistoryEvent(dagId, |
| new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time, |
| ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null, |
| null, null))); |
| historyEvents.add(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, time, time, |
| DAGState.ERROR, "diagnostics", null, user, dagPlan.getName(), |
| new HashMap<String, Integer>(), attemptId, dagPlan))); |
| return historyEvents; |
| } |
| |
| private static class FixedClock implements Clock { |
| final Clock clock = SystemClock.getInstance(); |
| final long diff; |
| |
| public FixedClock(long startTime) { |
| diff = clock.getTime() - startTime; |
| } |
| |
| @Override |
| public long getTime() { |
| return clock.getTime() - diff; |
| } |
| } |
| |
| private ProtoHistoryLoggingService createService(boolean splitEvents) throws IOException { |
| ProtoHistoryLoggingService service = new ProtoHistoryLoggingService(); |
| clock = new FixedClock(0); // Start time is always first day, easier to write tests. |
| AppContext appContext = mock(AppContext.class); |
| when(appContext.getApplicationID()).thenReturn(appId); |
| when(appContext.getApplicationAttemptId()).thenReturn(attemptId); |
| when(appContext.getUser()).thenReturn(user); |
| when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {}); |
| when(appContext.getClock()).thenReturn(clock); |
| service.setAppContext(appContext); |
| Configuration conf = new Configuration(false); |
| String basePath = tempFolder.newFolder().getAbsolutePath(); |
| conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); |
| conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath); |
| conf.setBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START, splitEvents); |
| service.init(conf); |
| return service; |
| } |
| |
| private void assertEventsRead(ProtoMessageReader<HistoryEventProto> reader, |
| List<HistoryEventProto> protos, int start, int finish) throws Exception { |
| for (int i = start; i < finish; ++i) { |
| try { |
| HistoryEventProto evt = reader.readEvent(); |
| Assert.assertEquals(protos.get(i), evt); |
| } catch (EOFException e) { |
| Assert.fail("Unexpected eof"); |
| } |
| } |
| try { |
| HistoryEventProto evt = reader.readEvent(); |
| Assert.assertNull(evt); |
| } catch (EOFException e) { |
| // Expected. |
| } |
| } |
| } |