blob: 077d5314f35c8ef3f4c68fd39a8d1d96bbb8e82d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server.scheduler;
import java.util.List;
import org.apache.lens.api.error.InvalidStateTransitionException;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.api.scheduler.*;
import org.apache.lens.server.api.events.AsyncEventListener;
import org.apache.lens.server.api.query.QueryContext;
import org.apache.lens.server.api.query.QueryEnded;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SchedulerQueryEventListener extends AsyncEventListener<QueryEnded> {
private static final String JOB_INSTANCE_ID_KEY = "job_instance_key";
private static final int CORE_POOL_SIZE = 10;
private SchedulerDAO schedulerDAO;
public SchedulerQueryEventListener(SchedulerDAO schedulerDAO) {
super(CORE_POOL_SIZE);
this.schedulerDAO = schedulerDAO;
}
@Override
public void process(QueryEnded event) {
if (event.getCurrentValue() == QueryStatus.Status.CLOSED) {
return;
}
QueryContext queryContext = event.getQueryContext();
if (queryContext == null) {
log.warn("Could not find the context for {} for event:{}.", event.getQueryHandle(), event.getCurrentValue());
return;
}
String instanceHandle = queryContext.getConf().get(JOB_INSTANCE_ID_KEY);
if (instanceHandle == null) {
// Nothing to do
return;
}
SchedulerJobInstanceInfo info = schedulerDAO
.getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle));
List<SchedulerJobInstanceRun> runList = info.getInstanceRunList();
if (runList.size() == 0) {
log.error("No instance run for {} with query {}", instanceHandle, queryContext.getQueryHandle());
return;
}
SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
SchedulerJobInstanceState state = latestRun.getInstanceState();
try {
switch (event.getCurrentValue()) {
case CANCELED:
state = state.nextTransition(SchedulerJobInstanceEvent.ON_KILL);
break;
case SUCCESSFUL:
state = state.nextTransition(SchedulerJobInstanceEvent.ON_SUCCESS);
break;
case FAILED:
state = state.nextTransition(SchedulerJobInstanceEvent.ON_FAILURE);
break;
}
latestRun.setEndTime(System.currentTimeMillis());
latestRun.setInstanceState(state);
latestRun.setResultPath(queryContext.getResultSetPath());
schedulerDAO.updateJobInstanceRun(latestRun);
log.info("Updated instance run {} for instance {} for job {} to {}", latestRun.getRunId(), info.getId(),
info.getJobId(), state);
} catch (InvalidStateTransitionException e) {
log.error("Instance Transition Failed ", e);
}
}
}