LENS-1265: Scheduler Bug fixes (followup to LENS-128)
diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java
index b19248f..50562f4 100644
--- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java
+++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInfo.java
@@ -18,14 +18,19 @@
  */
 package org.apache.lens.api.scheduler;
 
+import javax.xml.bind.annotation.XmlRootElement;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 /**
  * POJO to represent the <code>job</code> table in the database.
  */
 @Data
 @AllArgsConstructor
+@NoArgsConstructor
+@XmlRootElement
 public class SchedulerJobInfo {
 
   /**
@@ -65,5 +70,4 @@
    * @return last modified time for this job
    */
   private long modifiedOn;
-
 }
diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java
index 52b56ca..9148af1 100644
--- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java
+++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceInfo.java
@@ -20,14 +20,19 @@
 
 import java.util.List;
 
+import javax.xml.bind.annotation.XmlRootElement;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 /**
  * POJO for an instance of SchedulerJob.
  */
 @Data
 @AllArgsConstructor
+@NoArgsConstructor
+@XmlRootElement
 public class SchedulerJobInstanceInfo {
 
   /**
diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java
index e6c1571..8532ed0 100644
--- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java
+++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceRun.java
@@ -18,16 +18,21 @@
  */
 package org.apache.lens.api.scheduler;
 
+import javax.xml.bind.annotation.XmlRootElement;
+
 import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.query.QueryHandle;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
 
 @Data
 @AllArgsConstructor
 @EqualsAndHashCode
+@NoArgsConstructor
+@XmlRootElement
 public class SchedulerJobInstanceRun {
 
   /**
diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java
index 93d3d7e..3d2605e 100644
--- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java
+++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobInstanceState.java
@@ -18,8 +18,11 @@
  */
 package org.apache.lens.api.scheduler;
 
+import javax.xml.bind.annotation.*;
+
 import org.apache.lens.api.error.InvalidStateTransitionException;
 
+@XmlRootElement
 public enum SchedulerJobInstanceState
     implements StateTransitioner<SchedulerJobInstanceState, SchedulerJobInstanceEvent> {
   // repeating same operation will return the same state to ensure idempotent behavior.
diff --git a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java
index ffaae6c..f4fcce1 100644
--- a/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java
+++ b/lens-api/src/main/java/org/apache/lens/api/scheduler/SchedulerJobState.java
@@ -18,8 +18,11 @@
  */
 package org.apache.lens.api.scheduler;
 
+import javax.xml.bind.annotation.*;
+
 import org.apache.lens.api.error.InvalidStateTransitionException;
 
+@XmlRootElement
 public enum SchedulerJobState implements StateTransitioner<SchedulerJobState, SchedulerJobEvent> {
   // repeating same operation will return the same state to ensure idempotent behavior.
   NEW {
diff --git a/lens-api/src/main/resources/scheduler-job-0.1.xsd b/lens-api/src/main/resources/scheduler-job-0.1.xsd
index 4e6c68b..31f7d66 100644
--- a/lens-api/src/main/resources/scheduler-job-0.1.xsd
+++ b/lens-api/src/main/resources/scheduler-job-0.1.xsd
@@ -126,7 +126,7 @@
                     </xs:documentation>
                 </xs:annotation>
             </xs:element>
-            <xs:element type="xs:string" name="resource_path" minOccurs="0" maxOccurs="unbounded">
+            <xs:element type="resource_path" name="resource_path" minOccurs="0" maxOccurs="unbounded">
                 <xs:annotation>
                     <xs:documentation>
                         Path for resources like jars etc. e.g. /path/to/my/jar
@@ -136,6 +136,18 @@
         </xs:sequence>
     </xs:complexType>
 
+    <xs:complexType name="resource_path">
+        <xs:annotation>
+            <xs:documentation>
+                A resource path with file name and type
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element name="path" type="xs:string"/>
+            <xs:element name="type" type="xs:string"/>
+        </xs:sequence>
+    </xs:complexType>
+
     <xs:complexType name="x_job_query">
         <xs:annotation>
             <xs:documentation>
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java
index c7f73eb..8e1606e 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/scheduler/SchedulerService.java
@@ -70,22 +70,20 @@
   /**
    * Returns the definition of a job.
    *
-   * @param sessionHandle handle for the session.
    * @param jobHandle     handle for the job
    * @return job definition
    * @throws LensException the lens exception
    */
-  XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException;
+  XJob getJobDefinition(SchedulerJobHandle jobHandle) throws LensException;
 
   /**
    * Returns the details of a job. Details may contain extra system information like id for the job.
    *
-   * @param sessionHandle handle for the session.
    * @param jobHandle     handle for the job
    * @return job details for the job
    * @throws LensException the lens exception
    */
-  SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException;
+  SchedulerJobInfo getJobDetails(SchedulerJobHandle jobHandle) throws LensException;
 
   /**
    * Update a job with new definition.
@@ -146,7 +144,6 @@
   boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException;
 
   /**
-   * @param sessionHandle handle for the current session.
    * @param state         filter for status, if specified only jobs in that state will be returned,
    *                      if null no entries will be removed from result
    * @param user          filter for user who submitted the job, if specified only jobs submitted by the given user
@@ -158,13 +155,12 @@
    * @return A collection of stats per job
    * @throws LensException
    */
-  Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user,
+  Collection<SchedulerJobStats> getAllJobStats(String state, String user,
       String jobName, long startTime, long endTime) throws LensException;
 
   /**
    * Returns stats for a job.
    *
-   * @param sessionHandle handle for session.
    * @param handle        handle for the job
    * @param state         filter for status, if specified only jobs in that state will be returned,
    *                      if null no entries will be removed from result
@@ -172,19 +168,18 @@
    * @param endTime       if specified only instances with scheduleTime before this time will be considered.
    * @throws LensException the lens exception
    */
-  SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state,
+  SchedulerJobStats getJobStats(SchedulerJobHandle handle, String state,
       long startTime, long endTime) throws LensException;
 
   /**
    * Returns handles for last <code>numResults</code> instances for the job.
    *
-   * @param sessionHandle handle for the session.
    * @param jobHandle     handle for the job
    * @param numResults    - number of results to be returned, default 100.
    * @return list of instance ids for the job
    * @throws LensException the lens exception
    */
-  List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle,
+  List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle jobHandle,
       Long numResults) throws LensException;
 
   /**
@@ -215,12 +210,16 @@
   /**
    * Instance details for an instance.
    *
-   * @param sessionHandle  handle for the session.
    * @param instanceHandle handle for the instance.
    * @return details for the instance.
    * @throws LensException the lens exception
    */
-  SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle,
-      SchedulerJobInstanceHandle instanceHandle) throws LensException;
+  SchedulerJobInstanceInfo getInstanceDetails(SchedulerJobInstanceHandle instanceHandle) throws LensException;
 
+  /**
+   * Create session as user for scheduling the job with no auth.
+   * @param user
+   * @return LensSessionHandle
+   */
+  LensSessionHandle openSessionAsUser(String user) throws LensException;
 }
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
index 4491261..2009a20 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
@@ -138,13 +138,14 @@
     Trigger trigger;
     if (frequency.getEnum() != null) { //for enum expression:  create a trigger using calendar interval
       CalendarIntervalScheduleBuilder scheduleBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule()
-          .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum()))
-          .withMisfireHandlingInstructionIgnoreMisfires();
+        .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum()))
+        .withMisfireHandlingInstructionIgnoreMisfires();
       trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate())
-          .endAt(end.toDate()).withSchedule(scheduleBuilder).build();
+        .endAt(end.toDate()).withSchedule(scheduleBuilder).build();
     } else { // for cron expression create a cron trigger
       trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate())
-          .endAt(end.toDate()).withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())).build();
+        .endAt(end.toDate()).withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())
+          .withMisfireHandlingInstructionIgnoreMisfires()).build();
     }
 
     // Tell quartz to run the job using our trigger
@@ -190,7 +191,7 @@
     try {
       return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS));
     } catch (SchedulerException e) {
-      log.error("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e);
+      log.error("Failed to remove alarm triggers for job with jobHandle: {}", jobHandle);
       throw new LensException("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e);
     }
   }
@@ -199,8 +200,8 @@
     try {
       return scheduler.checkExists(JobKey.jobKey(handle.getHandleIdString(), LENS_JOBS));
     } catch (SchedulerException e) {
-      log.error("Failed to check the job with jobHandle: " + handle, e);
-      throw new LensException("Failed to check the job with jobHandle: " + handle, e);
+      log.error("Failed to check the job with jobHandle: {}", handle);
+      return false;
     }
   }
 
@@ -208,7 +209,7 @@
     try {
       scheduler.pauseJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS));
     } catch (SchedulerException e) {
-      log.error("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e);
+      log.error("Failed to pause alarm triggers for job with jobHandle: {}", jobHandle);
       throw new LensException("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e);
     }
   }
@@ -217,7 +218,7 @@
     try {
       scheduler.resumeJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS));
     } catch (SchedulerException e) {
-      log.error("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e);
+      log.error("Failed to resume alarm triggers for job with jobHandle: {}", jobHandle);
       throw new LensException("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e);
     }
   }
@@ -230,17 +231,17 @@
       DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime());
       SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle"));
       SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime,
-          SchedulerAlarmEvent.EventType.SCHEDULE, null);
+        SchedulerAlarmEvent.EventType.SCHEDULE, null);
       try {
         LensEventService eventService = LensServices.get().getService(LensEventService.NAME);
         eventService.notifyEvent(alarmEvent);
         if (jobExecutionContext.getNextFireTime() == null) {
           eventService
-              .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null));
+            .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null));
         }
       } catch (LensException e) {
         log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}",
-            jobHandle.getHandleIdString(), nominalTime.toString(), e);
+          jobHandle.getHandleIdString(), nominalTime.toString());
         throw new JobExecutionException("Failed to notify alarmEvent", e);
       }
     }
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java
index 8603edf..7a0b485 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/ScheduleResource.java
@@ -23,6 +23,7 @@
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
+import javax.xml.bind.JAXBElement;
 
 import org.apache.lens.api.APIResult;
 import org.apache.lens.api.LensSessionHandle;
@@ -39,6 +40,7 @@
 @Path("scheduler")
 @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
 public class ScheduleResource {
+  private static final ObjectFactory OBJECT_FACTORY = new ObjectFactory();
 
   public enum INSTANCE_ACTIONS {
     KILL, RERUN;
@@ -94,21 +96,22 @@
                                                 @DefaultValue("user") @QueryParam("user") String user,
                                                 @DefaultValue("-1") @QueryParam("start") long start,
                                                 @DefaultValue("-1") @QueryParam("end") long end) throws LensException {
-    return getSchedulerService().getAllJobStats(sessionId, status, user, jobName, start, end);
+    validateSession(sessionId);
+    return getSchedulerService().getAllJobStats(status, user, jobName, start, end);
   }
 
   @GET
   @Path("jobs/{jobHandle}")
-  public XJob getJobDefinition(@QueryParam("sessionid") LensSessionHandle sessionId,
+  public JAXBElement<XJob> getJobDefinition(@QueryParam("sessionid") LensSessionHandle sessionId,
                                @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException {
 
-    return getSchedulerService().getJobDefinition(sessionId, jobHandle);
+    return OBJECT_FACTORY.createJob(getSchedulerService().getJobDefinition(jobHandle));
   }
 
   @DELETE
   @Path("jobs/{jobHandle}")
   public APIResult deleteJob(@QueryParam("sessionid") LensSessionHandle sessionId,
-                             @QueryParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException {
+                             @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException {
     validateSession(sessionId);
     getSchedulerService().deleteJob(sessionId, jobHandle);
     return APIResult.success();
@@ -155,11 +158,11 @@
   }
 
   @GET
-  @Path("jobs/{jobHandle}/stats")
+  @Path("jobs/{jobHandle}/info")
   public SchedulerJobInfo getJobDetails(@QueryParam("sessionid") LensSessionHandle sessionId,
                                         @PathParam("jobHandle") SchedulerJobHandle jobHandle) throws LensException {
     validateSession(sessionId);
-    return getSchedulerService().getJobDetails(sessionId, jobHandle);
+    return getSchedulerService().getJobDetails(jobHandle);
   }
 
   @GET
@@ -168,7 +171,7 @@
                                                       @PathParam("jobHandle") SchedulerJobHandle jobHandle,
                                                       @QueryParam("numResults") Long numResults) throws LensException {
     validateSession(sessionId);
-    return getSchedulerService().getJobInstances(sessionId, jobHandle, numResults);
+    return getSchedulerService().getJobInstances(jobHandle, numResults);
   }
 
   @GET
@@ -177,34 +180,33 @@
                                                      @PathParam("instanceHandle")
                                                      SchedulerJobInstanceHandle instanceHandle) throws LensException {
     validateSession(sessionId);
-    return getSchedulerService().getInstanceDetails(sessionId, instanceHandle);
+    return getSchedulerService().getInstanceDetails(instanceHandle);
   }
 
   @POST
   @Path("instances/{instanceHandle}")
   public APIResult updateInstance(@QueryParam("sessionid") LensSessionHandle sessionId,
-                                @PathParam("instanceHandle") SchedulerJobInstanceHandle instanceHandle,
-                                @QueryParam("action") INSTANCE_ACTIONS action) throws LensException {
+    @PathParam("instanceHandle") SchedulerJobInstanceHandle instanceHandle,
+    @QueryParam("action") INSTANCE_ACTIONS action) throws LensException {
+    APIResult res = null;
     validateSession(sessionId);
-
-    APIResult res;
     switch (action) {
     case KILL:
       if (getSchedulerService().killInstance(sessionId, instanceHandle)) {
         res = new APIResult(APIResult.Status.SUCCEEDED,
-            "Killing the instance with id " + instanceHandle + " was successful");
+          "Killing the instance with id " + instanceHandle + " was successful");
       } else {
         res = new APIResult(APIResult.Status.FAILED,
-            "Killing the instance with id " + instanceHandle + " was not successful");
+          "Killing the instance with id " + instanceHandle + " was not successful");
       }
       break;
     case RERUN:
       if (getSchedulerService().rerunInstance(sessionId, instanceHandle)) {
         res = new APIResult(APIResult.Status.SUCCEEDED,
-            "Rerunning the instance with id " + instanceHandle + " was successful");
+          "Rerunning the instance with id " + instanceHandle + " was successful");
       } else {
         res = new APIResult(APIResult.Status.FAILED,
-            "Rerunning the instance with id " + instanceHandle + " was not successful");
+          "Rerunning the instance with id " + instanceHandle + " was not successful");
       }
       break;
     default:
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
index 7a2b06a..6a4c77b 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
@@ -286,7 +286,7 @@
     protected static final String COLUMN_USER = "username";
     protected static final String COLUMN_STATUS = "status";
     protected static final String COLUMN_CREATED_ON = "createdon";
-    protected static final String COLUMN_SCHEDULE_TIME = "schedultime";
+    protected static final String COLUMN_SCHEDULE_TIME = "scheduledtime";
     protected static final String COLUMN_MODIFIED_ON = "modifiedon";
     protected static final String COLUMN_JOB_ID = "jobid";
     protected static final String COLUMN_SESSION_HANDLE = "sessionhandle";
@@ -376,8 +376,8 @@
     public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun instanceRun) throws SQLException {
       String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " VALUES(?,?,?,?,?,?,?,?)";
       return runner.update(insetSQL, instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(),
-          instanceRun.getSessionHandle().toString(), instanceRun.getStartTime(), instanceRun.getEndTime(),
-          instanceRun.getResultPath(),
+          instanceRun.getSessionHandle() == null ? "" : instanceRun.getSessionHandle().toString(),
+          instanceRun.getStartTime(), instanceRun.getEndTime(), instanceRun.getResultPath(),
           instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(),
           instanceRun.getInstanceState().name());
     }
@@ -665,7 +665,7 @@
           "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, "
               + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME
               + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE
-              + " TEXT, " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID
+              + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID
               + ")" + ")";
       runner.update(createSQL);
     }
@@ -709,7 +709,7 @@
           "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, "
               + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME
               + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024),"
-              + COLUMN_QUERY_HANDLE + " VARCHAR(1024), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( "
+              + COLUMN_QUERY_HANDLE + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( "
               + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")";
       runner.update(createSQL);
     }
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
index 7323add..d1d4a68 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
@@ -19,9 +19,7 @@
 package org.apache.lens.server.scheduler;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.lens.api.LensConf;
 import org.apache.lens.api.LensSessionHandle;
@@ -33,9 +31,10 @@
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.events.AsyncEventListener;
 import org.apache.lens.server.api.events.SchedulerAlarmEvent;
+import org.apache.lens.server.api.metastore.CubeMetastoreService;
 import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.api.scheduler.SchedulerService;
-import org.apache.lens.server.query.QueryExecutionServiceImpl;
+import org.apache.lens.server.api.session.SessionService;
 import org.apache.lens.server.util.UtilityMethods;
 
 import org.joda.time.DateTime;
@@ -55,14 +54,48 @@
   protected QueryExecutionService queryService;
   private SchedulerDAO schedulerDAO;
   private SchedulerService schedulerService;
+  private SessionService sessionService;
+  private CubeMetastoreService cubeMetastoreService;
 
   public SchedulerEventListener(SchedulerDAO schedulerDAO) {
     super(CORE_POOL_SIZE);
     this.queryService = LensServices.get().getService(QueryExecutionService.NAME);
     this.schedulerService = LensServices.get().getService(SchedulerService.NAME);
+    this.sessionService = LensServices.get().getService(SessionService.NAME);
+    this.cubeMetastoreService = LensServices.get().getService(CubeMetastoreService.NAME);
     this.schedulerDAO = schedulerDAO;
   }
 
+  private LensSessionHandle getSessionHandle(String user) throws LensException {
+    return schedulerService.openSessionAsUser(user);
+  }
+
+  private void setSessionConf(LensSessionHandle sessionHandle, XJob job) throws LensException {
+    XExecution execution = job.getExecution();
+    XSessionType executionSession = execution.getSession();
+    cubeMetastoreService.setCurrentDatabase(sessionHandle, executionSession.getDb());
+    List<MapType> sessionConfList = executionSession.getConf();
+    for (MapType element : sessionConfList) {
+      sessionService.setSessionParameter(sessionHandle, element.getKey(), element.getValue());
+    }
+    List<ResourcePath> resourceList = executionSession.getResourcePath();
+    for (ResourcePath path : resourceList) {
+      sessionService.addResource(sessionHandle, path.getType(), path.getPath());
+    }
+  }
+
+  private LensConf getLensConf(XJob job, SchedulerJobInstanceHandle instanceHandle, DateTime scheduledTime) {
+    List<MapType> jobConf = job.getExecution().getQuery().getConf();
+    LensConf queryConf = new LensConf();
+    for (MapType element : jobConf) {
+      queryConf.addProperty(element.getKey(), element.getValue());
+    }
+    queryConf.addProperty(JOB_INSTANCE_ID_KEY, instanceHandle.getHandleId());
+    // Current time is used for resolving date.
+    queryConf.addProperty(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, scheduledTime.getMillis());
+    return queryConf;
+  }
+
   /**
    * @param event the event
    */
@@ -70,14 +103,6 @@
   public void process(SchedulerAlarmEvent event) {
     DateTime scheduledTime = event.getNominalTime();
     SchedulerJobHandle jobHandle = event.getJobHandle();
-    if (event.getType() == SchedulerAlarmEvent.EventType.EXPIRE) {
-      try {
-        schedulerService.expireJob(null, jobHandle);
-      } catch (LensException e) {
-        log.error("Error while expiring the job", e);
-      }
-      return;
-    }
     /*
      * Get the job from the store.
      * Create an instance.
@@ -86,75 +111,69 @@
      * If successfully submitted change the status to running.
      * Otherwise update the status to killed.
      */
-    //TODO: Get the job status and if it is not Scheduled, don't do anything.
     XJob job = schedulerDAO.getJob(jobHandle);
     String user = schedulerDAO.getUser(jobHandle);
     SchedulerJobInstanceHandle instanceHandle = event.getPreviousInstance() == null
                                                 ? UtilityMethods.generateSchedulerJobInstanceHandle()
                                                 : event.getPreviousInstance();
-    Map<String, String> conf = new HashMap<>();
-    LensSessionHandle sessionHandle = null;
-    try {
-      // Open the session with the user who submitted the job.
-      sessionHandle = ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionServiceImpl.NAME))
-          .openSession(user, "dummy", conf, false);
-    } catch (LensException e) {
-      log.error("Error occurred while opening a session ", e);
-      return;
-    }
     SchedulerJobInstanceInfo instance = null;
     SchedulerJobInstanceRun run = null;
-    // Session needs to be closed after the launch.
+    LensSessionHandle sessionHandle = null;
+
     try {
-      long scheduledTimeMillis = scheduledTime.getMillis();
-      String query = job.getExecution().getQuery().getQuery();
-      List<MapType> jobConf = job.getExecution().getQuery().getConf();
-      LensConf queryConf = new LensConf();
-      for (MapType element : jobConf) {
-        queryConf.addProperty(element.getKey(), element.getValue());
+      sessionHandle = getSessionHandle(user);
+      setSessionConf(sessionHandle, job);
+      if (event.getType() == SchedulerAlarmEvent.EventType.EXPIRE) {
+        try {
+          log.info("Expiring job with handle {}", jobHandle);
+          schedulerService.expireJob(sessionHandle, jobHandle);
+        } catch (LensException e) {
+          log.error("Error while expiring the job", e);
+        }
+        return;
       }
-      queryConf.addProperty(JOB_INSTANCE_ID_KEY, instanceHandle.getHandleId());
-      // Current time is used for resolving date.
-      queryConf.addProperty(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, scheduledTime.getMillis());
-      String queryName = job.getName();
-      queryName += "-" + scheduledTime.getMillis();
+      long scheduledTimeMillis = scheduledTime.getMillis();
       // If the instance is new then create otherwise get from the store
       if (event.getPreviousInstance() == null) {
         instance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, scheduledTimeMillis,
-            new ArrayList<SchedulerJobInstanceRun>());
+          new ArrayList<SchedulerJobInstanceRun>());
+        // Store the instance
+        if (schedulerDAO.storeJobInstance(instance) != 1) {
+          log.error("Store was unsuccessful for instance {} of job {} ", instanceHandle, jobHandle);
+          return;
+        }
       } else {
         instance = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
       }
       // Next run of the instance
       long currentTime = System.currentTimeMillis();
-      run = new SchedulerJobInstanceRun(instanceHandle, instance.getInstanceRunList().size() + 1, sessionHandle,
-          currentTime, 0, "N/A", null, SchedulerJobInstanceState.WAITING);
+      run = new SchedulerJobInstanceRun(instanceHandle, instance.getInstanceRunList().size() + 1, null, currentTime, 0,
+        "N/A", null, SchedulerJobInstanceState.WAITING);
       instance.getInstanceRunList().add(run);
-      boolean success;
-      if (event.getPreviousInstance() == null) {
-        success = schedulerDAO.storeJobInstance(instance) == 1;
-        if (!success) {
-          log.error(
-              "Exception occurred while storing the instance for instance handle " + instance + " of job " + jobHandle);
-          return;
-        }
-      }
-      success = schedulerDAO.storeJobInstanceRun(run) == 1;
-      if (!success) {
-        log.error(
-            "Exception occurred while storing the instance for instance handle " + instance + " of job " + jobHandle);
+      if (schedulerDAO.storeJobInstanceRun(run) != 1) {
+        log.error("Exception occurred while storing the instance run for instance handle {} of job {}", instance,
+          jobHandle);
         return;
       }
-
+      run.setSessionHandle(sessionHandle);
+      LensConf queryConf = getLensConf(job, instanceHandle, scheduledTime);
+      // Query Launch
+      String query = job.getExecution().getQuery().getQuery();
+      String queryName = job.getName();
+      queryName += "-" + scheduledTimeMillis;
       QueryHandle handle = queryService.executeAsync(sessionHandle, query, queryConf, queryName);
+      log.info("Running instance {} of job {} with run {} with query handle {}", instanceHandle, jobHandle,
+        run.getRunId(), handle);
       run.setQueryHandle(handle);
       run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RUN));
       run.setEndTime(System.currentTimeMillis());
+      // Update run
       schedulerDAO.updateJobInstanceRun(run);
+      log.info("Successfully updated instance run with instance {} of job {}", instanceHandle, jobHandle);
     } catch (LensException | InvalidStateTransitionException e) {
       log.error(
-          "Exception occurred while launching the job instance for " + jobHandle + " for nominal time " + scheduledTime
-              .getMillis(), e);
+        "Exception occurred while launching the job instance for " + jobHandle + " for nominal time " + scheduledTime
+          .getMillis(), e);
       try {
         run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_FAILURE));
         run.setEndTime(System.currentTimeMillis());
@@ -163,9 +182,9 @@
         log.error("Can't make transition for instance " + instance.getId() + " of job " + instance.getJobId(), e);
       }
     } finally {
+      // Session needs to be closed after the launch.
       try {
-        ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionServiceImpl.NAME))
-            .closeSession(sessionHandle);
+        sessionService.closeSession(sessionHandle);
       } catch (LensException e) {
         log.error("Error closing session ", e);
       }
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
index 5b12720..077d531 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
@@ -56,10 +56,10 @@
       return;
     }
     SchedulerJobInstanceInfo info = schedulerDAO
-        .getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle));
+      .getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle));
     List<SchedulerJobInstanceRun> runList = info.getInstanceRunList();
     if (runList.size() == 0) {
-      log.error("No instance run for " + instanceHandle + " with query " + queryContext.getQueryHandle());
+      log.error("No instance run for {} with query {}", instanceHandle, queryContext.getQueryHandle());
       return;
     }
     SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
@@ -78,8 +78,10 @@
       }
       latestRun.setEndTime(System.currentTimeMillis());
       latestRun.setInstanceState(state);
-      latestRun.setResultPath(queryContext.getDriverResultPath());
+      latestRun.setResultPath(queryContext.getResultSetPath());
       schedulerDAO.updateJobInstanceRun(latestRun);
+      log.info("Updated instance run {} for instance {} for job {} to {}", latestRun.getRunId(), info.getId(),
+        info.getJobId(), state);
     } catch (InvalidStateTransitionException e) {
       log.error("Instance Transition Failed ", e);
     }
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
index 14ca32d..28d7e27 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
@@ -19,6 +19,7 @@
 package org.apache.lens.server.scheduler;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.lens.api.LensConf;
@@ -94,6 +95,12 @@
     }
   }
 
+  private void doesSessionBelongToUser(LensSessionHandle sessionHandle, String user) throws LensException {
+    LensSessionImpl session = getSession(sessionHandle);
+    if (!session.getLoggedInUser().equals(user))
+      throw new LensException("Logged in user " + session.getLoggedInUser() + " is not same as " + user);
+  }
+
   @Override
   public synchronized void start() {
     super.start();
@@ -113,6 +120,15 @@
    * {@inheritDoc}
    */
   @Override
+  public LensSessionHandle openSessionAsUser(String user) throws LensException {
+    // Open session with no auth
+    return openSession(user, "Mimbulus Mimbletonia", new HashMap<String, String>(), false);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public SchedulerJobHandle submitJob(LensSessionHandle sessionHandle, XJob job) throws LensException {
     LensSessionImpl session = getSession(sessionHandle);
     // Validate XJob
@@ -121,8 +137,9 @@
     long createdOn = System.currentTimeMillis();
     long modifiedOn = createdOn;
     SchedulerJobInfo info = new SchedulerJobInfo(handle, job, session.getLoggedInUser(), SchedulerJobState.NEW,
-        createdOn, modifiedOn);
+      createdOn, modifiedOn);
     if (schedulerDAO.storeJob(info) == 1) {
+      log.info("Successfully submitted job with handle {}", handle);
       return handle;
     } else {
       throw new LensException("Could not Submit the job");
@@ -138,6 +155,7 @@
   @Override
   public boolean scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException {
     SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    doesSessionBelongToUser(sessionHandle, jobInfo.getUserName());
     XJob job = jobInfo.getJob();
     DateTime start = new DateTime(job.getStartTime().toGregorianCalendar().getTime());
     DateTime end = new DateTime(job.getEndTime().toGregorianCalendar().getTime());
@@ -145,7 +163,8 @@
     // check query
     checkQuery(sessionHandle, job);
     alarmService.schedule(start, end, frequency, jobHandle.getHandleIdString());
-    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SCHEDULE) == 1;
+    log.info("Successfully scheduled job with handle {} in AlarmService", jobHandle);
+    return setStateOfJob(jobInfo, SchedulerJobEvent.ON_SCHEDULE) == 1;
   }
 
   private void checkQuery(LensSessionHandle sessionHandle, XJob job) throws LensException {
@@ -156,7 +175,7 @@
     }
     queryConf.addProperty(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false);
     queryService.estimate(LensServices.get().getLogSegregationContext().getLogSegragationId(), sessionHandle,
-        job.getExecution().getQuery().getQuery(), queryConf);
+      job.getExecution().getQuery().getQuery(), queryConf);
     return;
   }
 
@@ -171,7 +190,7 @@
    * {@inheritDoc}
    */
   @Override
-  public XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException {
+  public XJob getJobDefinition(SchedulerJobHandle jobHandle) throws LensException {
     return schedulerDAO.getJob(jobHandle);
   }
 
@@ -179,8 +198,7 @@
    * {@inheritDoc}
    */
   @Override
-  public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle)
-    throws LensException {
+  public SchedulerJobInfo getJobDetails(SchedulerJobHandle jobHandle) throws LensException {
     return schedulerDAO.getSchedulerJobInfo(jobHandle);
   }
 
@@ -191,8 +209,8 @@
   public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, XJob newJobDefinition)
     throws LensException {
     SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    doesSessionBelongToUser(sessionHandle, jobInfo.getUserName());
     // This will allow only the job definition and configuration change.
-    // TODO: fix start and end time changes
     jobInfo.setJob(newJobDefinition);
     jobInfo.setModifiedOn(System.currentTimeMillis());
     int updated = schedulerDAO.updateJob(jobInfo);
@@ -204,10 +222,13 @@
    */
   @Override
   public boolean expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException {
+    SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    doesSessionBelongToUser(sessionHandle, info.getUserName());
     if (alarmService.checkExists(jobHandle)) {
       alarmService.unSchedule(jobHandle);
+      log.info("Successfully unscheduled the job with handle {} in AlarmService ", jobHandle);
     }
-    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_EXPIRE) == 1;
+    return setStateOfJob(info, SchedulerJobEvent.ON_EXPIRE) == 1;
   }
 
   /**
@@ -215,8 +236,10 @@
    */
   @Override
   public boolean suspendJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException {
+    SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    doesSessionBelongToUser(sessionHandle, info.getUserName());
     alarmService.pauseJob(jobHandle);
-    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SUSPEND) == 1;
+    return setStateOfJob(info, SchedulerJobEvent.ON_SUSPEND) == 1;
   }
 
   /**
@@ -224,8 +247,10 @@
    */
   @Override
   public boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException {
+    SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    doesSessionBelongToUser(sessionHandle, info.getUserName());
     alarmService.resumeJob(jobHandle);
-    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_RESUME) == 1;
+    return setStateOfJob(info, SchedulerJobEvent.ON_RESUME) == 1;
   }
 
   /**
@@ -233,18 +258,21 @@
    */
   @Override
   public boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException {
+    SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    doesSessionBelongToUser(sessionHandle, info.getUserName());
     if (alarmService.checkExists(jobHandle)) {
       alarmService.unSchedule(jobHandle);
+      log.info("Successfully unscheduled the job with handle {} ", jobHandle);
     }
-    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_DELETE) == 1;
+    return setStateOfJob(info, SchedulerJobEvent.ON_DELETE) == 1;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user,
-      String jobName, long startTime, long endTime) throws LensException {
+  public Collection<SchedulerJobStats> getAllJobStats(String state, String user, String jobName, long startTime,
+    long endTime) throws LensException {
     return null;
   }
 
@@ -252,8 +280,8 @@
    * {@inheritDoc}
    */
   @Override
-  public SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state,
-      long startTime, long endTime) throws LensException {
+  public SchedulerJobStats getJobStats(SchedulerJobHandle handle, String state, long startTime, long endTime)
+    throws LensException {
     return null;
   }
 
@@ -264,6 +292,7 @@
   public boolean rerunInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle)
     throws LensException {
     SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
+    doesSessionBelongToUser(sessionHandle, schedulerDAO.getUser(instanceInfo.getJobId()));
     if (schedulerDAO.getJobState(instanceInfo.getJobId()) != SchedulerJobState.SCHEDULED) {
       throw new LensException("Job with handle " + instanceInfo.getJobId() + " is not scheduled");
     }
@@ -277,8 +306,9 @@
     try {
       latestRun.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RERUN);
       getEventService().notifyEvent(
-          new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()),
-              SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle));
+        new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()),
+          SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle));
+      log.info("Rerunning the instance with {} for job {} ", instanceHandle, instanceInfo.getJobId());
     } catch (InvalidStateTransitionException e) {
       throw new LensException("Invalid State Transition ", e);
     }
@@ -289,8 +319,8 @@
    * {@inheritDoc}
    */
   @Override
-  public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle,
-      Long numResults) throws LensException {
+  public List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle jobHandle, Long numResults)
+    throws LensException {
     return schedulerDAO.getJobInstances(jobHandle);
   }
 
@@ -301,15 +331,28 @@
      * Get the query handle from the latest run.
      */
     SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
+    doesSessionBelongToUser(sessionHandle, schedulerDAO.getUser(instanceInfo.getJobId()));
     List<SchedulerJobInstanceRun> runList = instanceInfo.getInstanceRunList();
     if (runList.size() == 0) {
       throw new LensException("Job instance " + instanceHandle + " is not yet run");
     }
     SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
     QueryHandle handle = latestRun.getQueryHandle();
-    if (handle.getHandleIdString().isEmpty()) {
-      return false;
+    if (handle == null || handle.getHandleIdString().isEmpty()) {
+      SchedulerJobInstanceState state = latestRun.getInstanceState();
+      try {
+        state = state.nextTransition(SchedulerJobInstanceEvent.ON_KILL);
+      } catch (InvalidStateTransitionException e) {
+        throw new LensException("Invalid Transition of state ", e);
+      }
+      latestRun.setEndTime(System.currentTimeMillis());
+      latestRun.setInstanceState(state);
+      schedulerDAO.updateJobInstanceRun(latestRun);
+      log.info("Killing instance with {} for job {} ", instanceHandle, instanceInfo.getJobId());
+      return true;
     }
+    log.info("Killing instance with {} for job {} with query handle {} ", instanceHandle, instanceInfo.getJobId(),
+      handle);
     // This will cause the QueryEnd event which will set the status of the instance to KILLED.
     return queryService.cancelQuery(sessionHandle, handle);
   }
@@ -318,19 +361,22 @@
    * {@inheritDoc}
    */
   @Override
-  public SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle,
-      SchedulerJobInstanceHandle instanceHandle) throws LensException {
+  public SchedulerJobInstanceInfo getInstanceDetails(SchedulerJobInstanceHandle instanceHandle) throws LensException {
     return schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
   }
 
-  private int setStateOfJob(SchedulerJobHandle handle, SchedulerJobEvent event) throws LensException {
+  private int setStateOfJob(SchedulerJobInfo info, SchedulerJobEvent event) throws LensException {
     try {
-      SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(handle);
       SchedulerJobState currentState = info.getJobState();
       SchedulerJobState nextState = currentState.nextTransition(event);
       info.setJobState(nextState);
       info.setModifiedOn(System.currentTimeMillis());
-      return schedulerDAO.updateJobStatus(info);
+      int ret = schedulerDAO.updateJobStatus(info);
+      if (ret == 1) {
+        log.info("Successfully changed the status of job with handle {} from {} to {}", info.getId(), currentState,
+          nextState);
+      }
+      return ret;
     } catch (InvalidStateTransitionException e) {
       throw new LensException("Invalid state ", e);
     }
diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java
index 7b610de..02f371c 100644
--- a/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java
+++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/AlarmServiceTest.java
@@ -159,9 +159,10 @@
     SchedulerJobHandle jobHandle = new SchedulerJobHandle(UUID.randomUUID());
     System.out.println("jobHandle = " + jobHandle);
     XFrequency frequency = new XFrequency();
-    frequency.setCronExpression("0/1 * * * * ?");
+    frequency.setCronExpression("0 0 12 * * ?");
     alarmService.schedule(start, end, frequency, jobHandle.toString());
-    Thread.sleep(2000);
+    Thread.sleep(1000);
+    alarmService.unSchedule(jobHandle);
     // Assert that the events are fired and at per second interval.
     assertTrue(events.size() > 1);
   }
diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
index ce744af..e182c72 100644
--- a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
+++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
@@ -22,7 +22,6 @@
 import static org.mockito.Matchers.anyString;
 
 import java.util.GregorianCalendar;
-import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 
@@ -42,7 +41,6 @@
 import org.apache.lens.server.api.query.QueryEnded;
 import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.api.scheduler.SchedulerService;
-import org.apache.lens.server.query.QueryExecutionServiceImpl;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -59,8 +57,7 @@
 
   SchedulerServiceImpl scheduler;
   EventServiceImpl eventService;
-  LensSessionHandle sessionHandle = null;
-
+  String user = "someuser";
   @BeforeMethod
   public void setup() throws Exception {
     System.setProperty(LensConfConstants.CONFIG_LOCATION, "target/test-classes/");
@@ -69,19 +66,20 @@
   private void setupQueryService() throws Exception {
     QueryExecutionService queryExecutionService = PowerMockito.mock(QueryExecutionService.class);
     scheduler.setQueryService(queryExecutionService);
+    PowerMockito.when(
+      scheduler.getQueryService().estimate(anyString(), any(LensSessionHandle.class), anyString(), any(LensConf.class)))
+      .thenReturn(null);
     PowerMockito.when(scheduler.getQueryService()
-        .estimate(anyString(), any(LensSessionHandle.class), anyString(), any(LensConf.class))).thenReturn(null);
-    PowerMockito.when(scheduler.getQueryService()
-        .executeAsync(any(LensSessionHandle.class), anyString(), any(LensConf.class), anyString()))
-        .thenReturn(new QueryHandle(UUID.randomUUID()));
+      .executeAsync(any(LensSessionHandle.class), anyString(), any(LensConf.class), anyString()))
+      .thenReturn(new QueryHandle(UUID.randomUUID()));
     PowerMockito.when(scheduler.getQueryService().cancelQuery(any(LensSessionHandle.class), any(QueryHandle.class)))
-        .thenReturn(true);
+      .thenReturn(true);
     scheduler.getSchedulerEventListener().setQueryService(queryExecutionService);
   }
 
   private QueryEnded mockQueryEnded(SchedulerJobInstanceHandle instanceHandle, QueryStatus.Status status) {
     QueryContext mockContext = PowerMockito.mock(QueryContext.class);
-    PowerMockito.when(mockContext.getDriverResultPath()).thenReturn("/tmp/query1/result");
+    PowerMockito.when(mockContext.getResultSetPath()).thenReturn("/tmp/query1/result");
     Configuration conf = new Configuration();
     // set the instance handle
     conf.set("job_instance_key", instanceHandle.getHandleIdString());
@@ -101,8 +99,7 @@
     scheduler = LensServices.get().getService(SchedulerService.NAME);
     eventService = LensServices.get().getService(EventServiceImpl.NAME);
     setupQueryService();
-    sessionHandle = ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionService.NAME))
-        .openSession("someuser", "test", new HashMap<String, String>(), false);
+    LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user);
     long currentTime = System.currentTimeMillis();
     XJob job = getTestJob("0/5 * * * * ?", currentTime, currentTime + 15000);
     SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job);
@@ -119,16 +116,18 @@
     Thread.sleep(2000);
     // Check the instance value
     SchedulerJobInstanceInfo info = scheduler.getSchedulerDAO()
-        .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId());
+      .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId());
     Assert.assertEquals(info.getInstanceRunList().size(), 1);
     Assert.assertEquals(info.getInstanceRunList().get(0).getResultPath(), "/tmp/query1/result");
     Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.SUCCEEDED);
+    scheduler.closeSession(sessionHandle);
   }
 
   @Test(priority = 2)
   public void testSuspendResume() throws Exception {
     long currentTime = System.currentTimeMillis();
     XJob job = getTestJob("0/10 * * * * ?", currentTime, currentTime + 180000);
+    LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user);
     SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job);
     Assert.assertNotNull(jobHandle);
     Assert.assertTrue(scheduler.suspendJob(sessionHandle, jobHandle));
@@ -138,6 +137,7 @@
     Thread.sleep(10000);
     Assert.assertTrue(scheduler.expireJob(sessionHandle, jobHandle));
     Assert.assertEquals(scheduler.getSchedulerDAO().getJobState(jobHandle), SchedulerJobState.EXPIRED);
+    scheduler.closeSession(sessionHandle);
   }
 
   @Test(priority = 2)
@@ -145,6 +145,7 @@
     long currentTime = System.currentTimeMillis();
 
     XJob job = getTestJob("0/10 * * * * ?", currentTime, currentTime + 180000);
+    LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user);
     SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job);
     // Wait for some instances.
     Thread.sleep(15000);
@@ -153,7 +154,7 @@
     eventService.notifyEvent(mockQueryEnded(instanceHandleList.get(0).getId(), QueryStatus.Status.FAILED));
     Thread.sleep(1000);
     SchedulerJobInstanceInfo info = scheduler.getSchedulerDAO()
-        .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId());
+      .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId());
     // First run
     Assert.assertEquals(info.getInstanceRunList().size(), 1);
     Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.FAILED);
@@ -170,13 +171,14 @@
     Assert.assertEquals(info.getInstanceRunList().get(1).getInstanceState(), SchedulerJobInstanceState.SUCCEEDED);
     Assert.assertTrue(scheduler.expireJob(sessionHandle, jobHandle));
     Assert.assertEquals(scheduler.getSchedulerDAO().getJobState(jobHandle), SchedulerJobState.EXPIRED);
+    scheduler.closeSession(sessionHandle);
   }
 
   @Test(priority = 2)
   public void testKillRunningInstance() throws Exception {
     long currentTime = System.currentTimeMillis();
-
     XJob job = getTestJob("0/5 * * * * ?", currentTime, currentTime + 180000);
+    LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user);
     SchedulerJobHandle jobHandle = scheduler.submitAndScheduleJob(sessionHandle, job);
     // Let it run
     Thread.sleep(6000);
@@ -184,7 +186,7 @@
     Assert.assertTrue(scheduler.killInstance(sessionHandle, instanceHandleList.get(0).getId()));
     Thread.sleep(2000);
     SchedulerJobInstanceInfo info = scheduler.getSchedulerDAO()
-        .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId());
+      .getSchedulerJobInstanceInfo(instanceHandleList.get(0).getId());
     Assert.assertEquals(info.getInstanceRunList().size(), 1);
     Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.RUNNING);
     // Query End event
@@ -194,6 +196,7 @@
     Assert.assertEquals(info.getInstanceRunList().get(0).getInstanceState(), SchedulerJobInstanceState.KILLED);
     Assert.assertTrue(scheduler.expireJob(sessionHandle, jobHandle));
     Assert.assertEquals(scheduler.getSchedulerDAO().getJobState(jobHandle), SchedulerJobState.EXPIRED);
+    scheduler.closeSession(sessionHandle);
   }
 
   private XTrigger getTestTrigger(String cron) {
@@ -211,7 +214,7 @@
     query.setQuery("select ID from test_table");
     execution.setQuery(query);
     XSessionType sessionType = new XSessionType();
-    sessionType.setDb("test");
+    sessionType.setDb("default");
     execution.setSession(sessionType);
     return execution;
   }