blob: d878cb1eb81164d5d2b71d36662b571bb89d27fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server.scheduler;
import java.util.*;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.scheduler.*;
import org.apache.lens.server.LensServerConf;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.util.UtilityMethods;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.hadoop.conf.Configuration;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Test(groups = "unit-test")
public class SchedulerDAOTest {
SchedulerDAO schedulerDAO;
Map<SchedulerJobInstanceHandle, SchedulerJobInstanceInfo> instances = new HashMap<>();
SchedulerJobHandle jobHandle = null;
@BeforeClass
public void setup() throws Exception {
System.setProperty(LensConfConstants.CONFIG_LOCATION, "target/test-classes/");
Configuration conf = LensServerConf.getHiveConf();
QueryRunner runner = new QueryRunner(UtilityMethods.getDataSourceFromConfForScheduler(conf));
// Cleanup all tables
runner.update("DROP TABLE IF EXISTS job_table");
runner.update("DROP TABLE IF EXISTS job_instance_table");
this.schedulerDAO = new SchedulerDAO(conf);
}
private XTrigger getTestTrigger() {
XTrigger trigger = new XTrigger();
XFrequency frequency = new XFrequency();
frequency.setCronExpression("0 0 12 * * ?");
frequency.setTimezone("UTC");
trigger.setFrequency(frequency);
return trigger;
}
private XExecution getTestExecution() {
XExecution execution = new XExecution();
XJobQuery query = new XJobQuery();
query.setQuery("select * from test_table");
execution.setQuery(query);
XSessionType sessionType = new XSessionType();
sessionType.setDb("test");
execution.setSession(sessionType);
return execution;
}
private XJob getTestJob() throws DatatypeConfigurationException {
XJob job = new XJob();
job.setTrigger(getTestTrigger());
job.setName("Test lens Job");
GregorianCalendar startTime = new GregorianCalendar();
startTime.setTimeInMillis(System.currentTimeMillis());
XMLGregorianCalendar start = DatatypeFactory.newInstance().newXMLGregorianCalendar(startTime);
GregorianCalendar endTime = new GregorianCalendar();
endTime.setTimeInMillis(System.currentTimeMillis());
XMLGregorianCalendar end = DatatypeFactory.newInstance().newXMLGregorianCalendar(endTime);
job.setStartTime(start);
job.setEndTime(end);
job.setExecution(getTestExecution());
return job;
}
@Test(priority = 1)
public void testStoreJob() throws Exception {
XJob job = getTestJob();
long currentTime = System.currentTimeMillis();
jobHandle = new SchedulerJobHandle(UUID.randomUUID());
SchedulerJobInfo info = new SchedulerJobInfo(jobHandle, job, "lens", SchedulerJobState.NEW, currentTime,
currentTime);
// Store the job
schedulerDAO.storeJob(info);
// Retrive the stored job
XJob outJob = schedulerDAO.getJob(info.getId());
Assert.assertEquals(job, outJob);
}
@Test(priority = 2)
public void testStoreInstance() throws Exception {
long currentTime = System.currentTimeMillis();
SchedulerJobInstanceHandle instanceHandle = new SchedulerJobInstanceHandle(UUID.randomUUID());
SchedulerJobInstanceInfo firstInstance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, currentTime,
new ArrayList<SchedulerJobInstanceRun>());
SchedulerJobInstanceRun run1 = new SchedulerJobInstanceRun(instanceHandle, 1,
new LensSessionHandle(UUID.randomUUID(), UUID.randomUUID()), currentTime, currentTime, "/tmp/",
QueryHandle.fromString(UUID.randomUUID().toString()), SchedulerJobInstanceState.WAITING);
instances.put(firstInstance.getId(), firstInstance);
schedulerDAO.storeJobInstance(firstInstance);
schedulerDAO.storeJobInstanceRun(run1);
// Put run in the instance
firstInstance.getInstanceRunList().add(run1);
currentTime = System.currentTimeMillis();
instanceHandle = new SchedulerJobInstanceHandle(UUID.randomUUID());
SchedulerJobInstanceInfo secondInstance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, currentTime,
new ArrayList<SchedulerJobInstanceRun>());
SchedulerJobInstanceRun run2 = new SchedulerJobInstanceRun(instanceHandle, 1,
new LensSessionHandle(UUID.randomUUID(), UUID.randomUUID()), currentTime, currentTime, "/tmp/",
QueryHandle.fromString(UUID.randomUUID().toString()), SchedulerJobInstanceState.WAITING);
instances.put(secondInstance.getId(), secondInstance);
schedulerDAO.storeJobInstance(secondInstance);
schedulerDAO.storeJobInstanceRun(run2);
secondInstance.getInstanceRunList().add(run2);
List<SchedulerJobInstanceInfo> handleList = schedulerDAO.getJobInstances(jobHandle);
// Size should be 2
Assert.assertEquals(handleList.size(), 2);
// Get the definition of instance from the store.
SchedulerJobInstanceInfo instance1 = handleList.get(0);
Assert.assertEquals(instances.get(handleList.get(0).getId()), instance1);
SchedulerJobInstanceInfo instance2 = handleList.get(1);
Assert.assertEquals(instances.get(handleList.get(1).getId()), instance2);
}
@Test(priority = 2)
public void testUpdateJob() throws Exception {
// Get all the stored jobs.
// update one and check if it successful.
SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle);
XJob newJob = getTestJob();
jobInfo.setJob(newJob);
schedulerDAO.updateJob(jobInfo);
XJob storedJob = schedulerDAO.getJob(jobInfo.getId());
Assert.assertEquals(storedJob, newJob);
// Change SchedulerJobInstanceState
jobInfo.setJobState(jobInfo.getJobState().nextTransition(SchedulerJobEvent.ON_SCHEDULE));
schedulerDAO.updateJobStatus(jobInfo);
Assert.assertEquals(schedulerDAO.getJobState(jobInfo.getId()), SchedulerJobState.SCHEDULED);
}
@Test(priority = 3)
public void testUpdateJobInstance() {
SchedulerJobInstanceHandle handle = instances.keySet().iterator().next();
SchedulerJobInstanceInfo info = instances.get(handle);
SchedulerJobInstanceRun run = info.getInstanceRunList().get(0);
run.setInstanceState(SchedulerJobInstanceState.LAUNCHING);
schedulerDAO.updateJobInstanceRun(run);
// Get the instance
Assert.assertEquals(schedulerDAO.getSchedulerJobInstanceInfo(handle), info);
}
@Test(priority = 3)
public void testSearchStoreJob() throws Exception {
// Store more jobs with the one user and search
XJob job = getTestJob();
long currentTime = System.currentTimeMillis();
SchedulerJobInfo info = new SchedulerJobInfo(SchedulerJobHandle.fromString(UUID.randomUUID().toString()), job,
"lens", SchedulerJobState.NEW, currentTime, currentTime);
// Store the job
schedulerDAO.storeJob(info);
info = new SchedulerJobInfo(SchedulerJobHandle.fromString(UUID.randomUUID().toString()), job, "lens",
SchedulerJobState.NEW, currentTime, currentTime);
schedulerDAO.storeJob(info);
// There should be 3 jobs till now.
Assert.assertEquals(schedulerDAO.getJobs("lens", null, null, SchedulerJobState.values()).size(), 3);
//Assert.assertEquals(schedulerDAO.getJobs("lens", 1L, System.currentTimeMillis(), SchedulerJobState.NEW).size(), 2);
Assert.assertEquals(schedulerDAO.getJobs("Alice", null, null, SchedulerJobState.NEW).size(), 0);
}
}