| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.oozie.command.wf; |
| |
| import org.apache.oozie.command.OperationType; |
| import org.apache.oozie.WorkflowActionBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.test.XDataTestCase; |
| import org.apache.oozie.workflow.WorkflowInstance; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| public class TestBulkWorkflowXCommand extends XDataTestCase { |
| private static final String VALID_APP_NAME = "testApp"; |
| private static final String INVALID_APP_NAME = "testApp-new"; |
| private Services services; |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| services = new Services(); |
| services.init(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| services.destroy(); |
| super.tearDown(); |
| } |
| |
| public void testbulkWfKillSuspendResumeSuccess() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.PREP); |
| |
| WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.PREP); |
| |
| Map<String, List<String>> map = getFilterMap(VALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 10, OperationType.Suspend).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.SUSPENDED); |
| verifyJobStatus(job2.getId(), WorkflowJob.Status.SUSPENDED); |
| |
| new BulkWorkflowXCommand(map, 1, 10, OperationType.Resume).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); |
| verifyJobStatus(job2.getId(), WorkflowJob.Status.RUNNING); |
| |
| new BulkWorkflowXCommand(map, 1, 10, OperationType.Kill).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.KILLED); |
| verifyJobStatus(job2.getId(), WorkflowJob.Status.KILLED); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.KILLED, WorkflowAction.Status.FAILED); |
| verifyActionStatus(action2.getId(), WorkflowAction.Status.KILLED, WorkflowAction.Status.FAILED); |
| } |
| |
| public void testbulkWfKillSuccess() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); |
| WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| Map<String, List<String>> map = getFilterMap(VALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Kill).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.KILLED); |
| verifyJobStatus(job2.getId(), WorkflowJob.Status.KILLED); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.KILLED, WorkflowAction.Status.FAILED); |
| verifyActionStatus(action2.getId(), WorkflowAction.Status.KILLED, WorkflowAction.Status.FAILED); |
| } |
| |
| public void testbulkWfKillNoOp() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); |
| WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.DONE); |
| |
| Map<String, List<String>> map = getFilterMap(VALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Kill).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.KILLED); |
| verifyJobStatus(job2.getId(), WorkflowJob.Status.SUCCEEDED); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.KILLED, WorkflowAction.Status.FAILED); |
| verifyActionStatus(action2.getId(), WorkflowAction.Status.DONE); |
| } |
| |
| public void testbulkWfKillNegative() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| Map<String, List<String>> map = getFilterMap(INVALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Kill).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); |
| } |
| |
| public void testBulkSuspendNoOp() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); |
| WorkflowActionBean action2 = this.addRecordToWfActionTable(job2.getId(), "1", WorkflowAction.Status.DONE); |
| |
| Map<String, List<String>> map = getFilterMap(VALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Suspend).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.SUSPENDED); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); |
| verifyJobStatus(job2.getId(), WorkflowJob.Status.SUCCEEDED); |
| verifyActionStatus(action2.getId(), WorkflowAction.Status.DONE); |
| } |
| |
| public void testBulkSuspendNegative() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| Map<String, List<String>> map = getFilterMap(INVALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Suspend).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); |
| } |
| |
| public void testBulkResumeNegative() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| Map<String, List<String>> map = getFilterMap(INVALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Resume).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.SUSPENDED); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); |
| } |
| |
| public void testBulkResumeNoOp() throws Exception { |
| WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.RUNNING); |
| |
| Map<String, List<String>> map = getFilterMap(VALID_APP_NAME); |
| |
| new BulkWorkflowXCommand(map, 1, 50, OperationType.Resume).call(); |
| verifyJobStatus(job1.getId(), WorkflowJob.Status.RUNNING); |
| verifyActionStatus(action1.getId(), WorkflowAction.Status.RUNNING); |
| } |
| |
| private void verifyJobStatus(String jobId, WorkflowJob.Status status) throws Exception { |
| WorkflowJobBean job = WorkflowJobQueryExecutor.getInstance().get( |
| WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId); |
| assertEquals( "Invalid job status", status, job.getStatus()); |
| } |
| |
| private void verifyActionStatus(String actionId, WorkflowAction.Status... statuses) throws Exception { |
| WorkflowActionBean action = WorkflowActionQueryExecutor.getInstance().get( |
| WorkflowActionQueryExecutor.WorkflowActionQuery.GET_ACTION, actionId); |
| assertTrue("Invalid action status", Arrays.asList(statuses).contains(action.getStatus())); |
| } |
| |
| private Map<String, List<String>> getFilterMap(String appName) { |
| Map<String, List<String>> map = new HashMap<>(); |
| List<String> names = Collections.singletonList(appName); |
| map.put("name", names); |
| return map; |
| } |
| |
| } |