| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ambari.view.hive2; |
| |
| import akka.actor.ActorRef; |
| import akka.actor.ActorSystem; |
| import akka.actor.Props; |
| import akka.testkit.JavaTestKit; |
| import com.beust.jcommander.internal.Lists; |
| import com.google.common.base.Optional; |
| import org.apache.ambari.view.ViewContext; |
| import org.apache.ambari.view.hive2.actor.DeathWatch; |
| import org.apache.ambari.view.hive2.actor.OperationController; |
| import org.apache.ambari.view.hive2.actor.message.Connect; |
| import org.apache.ambari.view.hive2.actor.message.ExecuteJob; |
| import org.apache.ambari.view.hive2.actor.message.HiveJob; |
| import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; |
| import org.apache.ambari.view.hive2.internal.ConnectionSupplier; |
| import org.apache.ambari.view.hive2.internal.DataStorageSupplier; |
| import org.apache.ambari.view.hive2.internal.HdfsApiSupplier; |
| import org.apache.ambari.view.hive2.internal.HiveConnectionWrapper; |
| import org.apache.ambari.view.hive2.persistence.Storage; |
| import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; |
| import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; |
| import org.apache.ambari.view.utils.hdfs.HdfsApi; |
| import org.apache.hive.jdbc.HiveConnection; |
| import org.apache.hive.jdbc.HiveQueryResultSet; |
| import org.apache.hive.jdbc.HiveStatement; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.sql.ResultSet; |
| import java.util.HashMap; |
| |
| import static org.easymock.EasyMock.*; |
| |
| public class JobExecutionTest { |
| |
| private ActorSystem actorSystem; |
| |
| @Before |
| public void setUp() throws Exception { |
| actorSystem = ActorSystem.create("TestingActorSystem"); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| JavaTestKit.shutdownActorSystem(actorSystem); |
| } |
| |
| @Test |
| public void testExecuteJob() throws Exception { |
| ViewContext viewContext = createNiceMock(ViewContext.class); |
| ConnectionSupplier connectionSupplier = createNiceMock(ConnectionSupplier.class); |
| DataStorageSupplier dataStorageSupplier = createNiceMock(DataStorageSupplier.class); |
| HdfsApi hdfsApi = createNiceMock(HdfsApi.class); |
| HdfsApiSupplier hdfsApiSupplier = createNiceMock(HdfsApiSupplier.class); |
| Connect connect = createNiceMock(Connect.class); |
| Storage storage = createNiceMock(Storage.class); |
| JobImpl jobImpl = createNiceMock(JobImpl.class); |
| ResultSet resultSet = createNiceMock(HiveQueryResultSet.class); |
| HiveStatement statement = createNiceMock(HiveStatement.class); |
| ConnectionDelegate delegate = createNiceMock(ConnectionDelegate.class); |
| HiveConnectionWrapper connectionWrapper = createNiceMock(HiveConnectionWrapper.class); |
| HiveConnection hiveConnection = createNiceMock(HiveConnection.class); |
| HiveJob test = new SQLStatementJob(HiveJob.Type.ASYNC, new String[]{"select * from test"}, "test", "1", "test.log"); |
| ExecuteJob executeJob = new ExecuteJob(connect, test); |
| ActorRef deathwatch = actorSystem.actorOf(Props.create(DeathWatch.class)); |
| ActorRef operationControl = actorSystem.actorOf( |
| Props.create(OperationController.class, actorSystem, deathwatch, viewContext, connectionSupplier, dataStorageSupplier, hdfsApiSupplier), "operationController-test"); |
| expect(hdfsApiSupplier.get(viewContext)).andReturn(Optional.of(hdfsApi)); |
| expect(viewContext.getProperties()).andReturn(new HashMap<String, String>()).anyTimes(); |
| expect(connect.getConnectable(anyObject(AuthParams.class))).andReturn(connectionWrapper); |
| expect(connectionWrapper.isOpen()).andReturn(false); |
| expect(connectionWrapper.getConnection()).andReturn(Optional.of(hiveConnection)).anyTimes(); |
| expect(dataStorageSupplier.get(viewContext)).andReturn(storage); |
| expect(connectionSupplier.get(viewContext)).andReturn(delegate); |
| expect(storage.load(JobImpl.class, "1")).andReturn(jobImpl).anyTimes(); |
| expect(delegate.createStatement(hiveConnection)).andReturn(statement); |
| expect(delegate.execute("select * from test")).andReturn(Optional.of(resultSet)); |
| expect(statement.getQueryLog()).andReturn(Lists.<String>newArrayList()); |
| expect(jobImpl.getDateSubmitted()).andReturn(0L).times(2); |
| jobImpl.setStatus(Job.JOB_STATE_RUNNING); |
| storage.store(JobImpl.class, jobImpl); |
| connectionWrapper.connect(); |
| jobImpl.setStatus(Job.JOB_STATE_FINISHED); |
| storage.store(JobImpl.class, jobImpl); |
| replay(viewContext, connect, hdfsApiSupplier, dataStorageSupplier, connectionWrapper, |
| storage, jobImpl, connectionSupplier, delegate, statement, resultSet); |
| |
| operationControl.tell(executeJob, ActorRef.noSender()); |
| Thread.sleep(5000); |
| verify(connect, hdfsApiSupplier, dataStorageSupplier, connectionWrapper, |
| storage, jobImpl, connectionSupplier, delegate, statement, resultSet); |
| |
| } |
| |
| |
| } |