| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.util; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.api.services.dataflow.model.JobMessage; |
| import com.google.api.services.dataflow.model.ListJobMessagesResponse; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import org.apache.beam.runners.dataflow.DataflowClient; |
| import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; |
| import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler; |
| import org.apache.beam.sdk.PipelineResult.State; |
| import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.testing.ExpectedLogs; |
| import org.joda.time.DateTime; |
| import org.joda.time.Instant; |
| import org.joda.time.chrono.ISOChronology; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for MonitoringUtil. */ |
| @RunWith(JUnit4.class) |
| public class MonitoringUtilTest { |
| private static final String PROJECT_ID = "someProject"; |
| private static final String REGION_ID = "thatRegion"; |
| private static final String JOB_ID = "1234"; |
| |
| @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LoggingHandler.class); |
| @Rule public ExpectedException thrown = ExpectedException.none(); |
| |
| @Test |
| public void testGetJobMessages() throws IOException { |
| DataflowClient dataflowClient = mock(DataflowClient.class); |
| |
| ListJobMessagesResponse firstResponse = new ListJobMessagesResponse(); |
| firstResponse.setJobMessages(new ArrayList<>()); |
| for (long i = 0; i < 100; ++i) { |
| JobMessage message = new JobMessage(); |
| message.setId("message_" + i); |
| message.setTime(TimeUtil.toCloudTime(new Instant(i))); |
| firstResponse.getJobMessages().add(message); |
| } |
| String pageToken = "page_token"; |
| firstResponse.setNextPageToken(pageToken); |
| |
| ListJobMessagesResponse secondResponse = new ListJobMessagesResponse(); |
| secondResponse.setJobMessages(new ArrayList<>()); |
| for (long i = 100; i < 150; ++i) { |
| JobMessage message = new JobMessage(); |
| message.setId("message_" + i); |
| message.setTime(TimeUtil.toCloudTime(new Instant(i))); |
| secondResponse.getJobMessages().add(message); |
| } |
| |
| when(dataflowClient.listJobMessages(JOB_ID, null)).thenReturn(firstResponse); |
| when(dataflowClient.listJobMessages(JOB_ID, pageToken)).thenReturn(secondResponse); |
| |
| MonitoringUtil util = new MonitoringUtil(dataflowClient); |
| |
| List<JobMessage> messages = util.getJobMessages(JOB_ID, -1); |
| |
| assertEquals(150, messages.size()); |
| } |
| |
| @Test |
| public void testToStateNormal() { |
| // Trivially mapped cases |
| assertEquals(State.UNKNOWN, MonitoringUtil.toState("JOB_STATE_UNKNOWN")); |
| assertEquals(State.STOPPED, MonitoringUtil.toState("JOB_STATE_STOPPED")); |
| assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_RUNNING")); |
| assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DONE")); |
| assertEquals(State.FAILED, MonitoringUtil.toState("JOB_STATE_FAILED")); |
| assertEquals(State.CANCELLED, MonitoringUtil.toState("JOB_STATE_CANCELLED")); |
| assertEquals(State.UPDATED, MonitoringUtil.toState("JOB_STATE_UPDATED")); |
| |
| // Non-trivially mapped cases |
| assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_DRAINING")); |
| assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DRAINED")); |
| } |
| |
| @Test |
| public void testToStateWithNullReturnsUnrecognized() { |
| assertEquals(State.UNRECOGNIZED, MonitoringUtil.toState(null)); |
| } |
| |
| @Test |
| public void testToStateWithOtherValueReturnsUnknown() { |
| assertEquals(State.UNRECOGNIZED, MonitoringUtil.toState("FOO_BAR_BAZ")); |
| } |
| |
| @Test |
| public void testDontOverrideEndpointWithDefaultApi() { |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| options.setProject(PROJECT_ID); |
| options.setRegion(REGION_ID); |
| options.setGcpCredential(new TestCredential()); |
| String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID); |
| assertEquals( |
| "gcloud dataflow jobs --project=someProject cancel --region=thatRegion 1234", |
| cancelCommand); |
| } |
| |
| @Test |
| public void testOverridesEndpointWithStagedDataflowEndpoint() { |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| options.setProject(PROJECT_ID); |
| options.setRegion(REGION_ID); |
| options.setGcpCredential(new TestCredential()); |
| String stagingDataflowEndpoint = "v0neverExisted"; |
| options.setDataflowEndpoint(stagingDataflowEndpoint); |
| String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID); |
| assertEquals( |
| "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ " |
| + "gcloud dataflow jobs --project=someProject cancel --region=thatRegion 1234", |
| cancelCommand); |
| } |
| |
| @Test |
| public void testLoggingHandler() { |
| DateTime errorTime = new DateTime(1000L, ISOChronology.getInstanceUTC()); |
| DateTime warningTime = new DateTime(2000L, ISOChronology.getInstanceUTC()); |
| DateTime basicTime = new DateTime(3000L, ISOChronology.getInstanceUTC()); |
| DateTime detailedTime = new DateTime(4000L, ISOChronology.getInstanceUTC()); |
| DateTime debugTime = new DateTime(5000L, ISOChronology.getInstanceUTC()); |
| DateTime unknownTime = new DateTime(6000L, ISOChronology.getInstanceUTC()); |
| JobMessage errorJobMessage = new JobMessage(); |
| errorJobMessage.setMessageImportance("JOB_MESSAGE_ERROR"); |
| errorJobMessage.setMessageText("ERRORERROR"); |
| errorJobMessage.setTime(TimeUtil.toCloudTime(errorTime)); |
| JobMessage warningJobMessage = new JobMessage(); |
| warningJobMessage.setMessageImportance("JOB_MESSAGE_WARNING"); |
| warningJobMessage.setMessageText("WARNINGWARNING"); |
| warningJobMessage.setTime(TimeUtil.toCloudTime(warningTime)); |
| JobMessage basicJobMessage = new JobMessage(); |
| basicJobMessage.setMessageImportance("JOB_MESSAGE_BASIC"); |
| basicJobMessage.setMessageText("BASICBASIC"); |
| basicJobMessage.setTime(TimeUtil.toCloudTime(basicTime)); |
| JobMessage detailedJobMessage = new JobMessage(); |
| detailedJobMessage.setMessageImportance("JOB_MESSAGE_DETAILED"); |
| detailedJobMessage.setMessageText("DETAILEDDETAILED"); |
| detailedJobMessage.setTime(TimeUtil.toCloudTime(detailedTime)); |
| JobMessage debugJobMessage = new JobMessage(); |
| debugJobMessage.setMessageImportance("JOB_MESSAGE_DEBUG"); |
| debugJobMessage.setMessageText("DEBUGDEBUG"); |
| debugJobMessage.setTime(TimeUtil.toCloudTime(debugTime)); |
| JobMessage unknownJobMessage = new JobMessage(); |
| unknownJobMessage.setMessageImportance("JOB_MESSAGE_UNKNOWN"); |
| unknownJobMessage.setMessageText("UNKNOWNUNKNOWN"); |
| unknownJobMessage.setTime(""); |
| JobMessage emptyJobMessage = new JobMessage(); |
| emptyJobMessage.setMessageImportance("JOB_MESSAGE_EMPTY"); |
| emptyJobMessage.setTime(TimeUtil.toCloudTime(unknownTime)); |
| |
| new LoggingHandler() |
| .process( |
| Arrays.asList( |
| errorJobMessage, |
| warningJobMessage, |
| basicJobMessage, |
| detailedJobMessage, |
| debugJobMessage, |
| unknownJobMessage)); |
| |
| expectedLogs.verifyError("ERRORERROR"); |
| expectedLogs.verifyError(errorTime.toString()); |
| expectedLogs.verifyWarn("WARNINGWARNING"); |
| expectedLogs.verifyWarn(warningTime.toString()); |
| expectedLogs.verifyInfo("BASICBASIC"); |
| expectedLogs.verifyInfo(basicTime.toString()); |
| expectedLogs.verifyInfo("DETAILEDDETAILED"); |
| expectedLogs.verifyInfo(detailedTime.toString()); |
| expectedLogs.verifyDebug("DEBUGDEBUG"); |
| expectedLogs.verifyDebug(debugTime.toString()); |
| expectedLogs.verifyTrace("UNKNOWN TIMESTAMP"); |
| expectedLogs.verifyTrace("UNKNOWNUNKNOWN"); |
| expectedLogs.verifyNotLogged(unknownTime.toString()); |
| } |
| } |