blob: e48053d7ae7397895e82a55e02305935230cd69d [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.core.job;
import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
import static org.apache.griffin.core.config.PropertiesConfig.livyConfMap;
import static org.apache.griffin.core.job.JobInstance.JOB_NAME;
import static org.apache.griffin.core.job.JobInstance.MEASURE_KEY;
import static org.apache.griffin.core.job.JobInstance.PREDICATES_KEY;
import static org.apache.griffin.core.job.JobInstance.PREDICATE_JOB_NAME;
import static org.apache.griffin.core.job.entity.LivySessionStates.State;
import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
import static org.apache.griffin.core.util.JsonUtil.toEntity;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.apache.griffin.core.job.factory.PredicatorFactory;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
import org.apache.griffin.core.util.JsonUtil;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.quartz.SchedulerException;
import org.quartz.SimpleTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* Simple implementation of the Quartz Job interface, submitting the
* griffin job to spark cluster via livy
*
* @see LivyTaskSubmitHelper#postToLivy(String)
* @see Job#execute(JobExecutionContext)
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Component
public class SparkSubmitJob implements Job {
private static final Logger LOGGER =
LoggerFactory.getLogger(SparkSubmitJob.class);
@Autowired
private JobInstanceRepo jobInstanceRepo;
@Autowired
private BatchJobOperatorImpl batchJobOp;
@Autowired
private Environment env;
@Autowired
private LivyTaskSubmitHelper livyTaskSubmitHelper;
@Value("${livy.need.queue:false}")
private boolean isNeedLivyQueue;
@Value("${livy.task.appId.retry.count:3}")
private int appIdRetryCount;
private GriffinMeasure measure;
private String livyUri;
private List<SegmentPredicate> mPredicates;
private JobInstanceBean jobInstance;
@Override
public void execute(JobExecutionContext context) {
JobDetail jd = context.getJobDetail();
try {
initParam(jd);
setLivyConf();
if (!success(mPredicates)) {
updateJobInstanceState(context);
return;
}
if (isNeedLivyQueue) {
//livy batch limit
livyTaskSubmitHelper.addTaskToWaitingQueue(jd);
} else {
saveJobInstance(jd);
}
} catch (Exception e) {
LOGGER.error("Post spark task ERROR.", e);
}
}
private void updateJobInstanceState(JobExecutionContext context)
throws IOException {
SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
int repeatCount = simpleTrigger.getRepeatCount();
int fireCount = simpleTrigger.getTimesTriggered();
if (fireCount > repeatCount) {
saveJobInstance(null, NOT_FOUND);
}
}
private String post2Livy() {
return livyTaskSubmitHelper.postToLivy(livyUri);
}
private boolean success(List<SegmentPredicate> predicates) {
if (CollectionUtils.isEmpty(predicates)) {
return true;
}
for (SegmentPredicate segPredicate : predicates) {
Predicator predicator = PredicatorFactory
.newPredicateInstance(segPredicate);
try {
if (predicator != null && !predicator.predicate()) {
return false;
}
} catch (Exception e) {
return false;
}
}
return true;
}
private void initParam(JobDetail jd) throws IOException {
mPredicates = new ArrayList<>();
jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
.getString(PREDICATE_JOB_NAME));
measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
GriffinMeasure.class);
livyUri = env.getProperty("livy.uri");
setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
// in order to keep metric name unique, we set job name
// as measure name at present
measure.setName(jd.getJobDataMap().getString(JOB_NAME));
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void setPredicates(String json) throws IOException {
if (StringUtils.isEmpty(json)) {
return;
}
List<SegmentPredicate> predicates = toEntity(json,
new TypeReference<List<SegmentPredicate>>() {
});
if (predicates != null) {
mPredicates.addAll(predicates);
}
}
private String escapeCharacter(String str, String regex) {
if (StringUtils.isEmpty(str)) {
return str;
}
String escapeCh = "\\" + regex;
return str.replaceAll(regex, escapeCh);
}
private String genEnv() {
ProcessType type = measure.getProcessType();
String env = type == BATCH ? ENV_BATCH : ENV_STREAMING;
return env.replaceAll("\\$\\{JOB_NAME}", measure.getName());
}
private void setLivyConf() throws IOException {
setLivyArgs();
}
private void setLivyArgs() throws IOException {
List<String> args = new ArrayList<>();
args.add(genEnv());
String measureJson = JsonUtil.toJsonWithFormat(measure);
// to fix livy bug: character will be ignored by livy
String finalMeasureJson = escapeCharacter(measureJson, "\\`");
LOGGER.info(finalMeasureJson);
args.add(finalMeasureJson);
args.add("raw,raw");
livyConfMap.put("args", args);
}
protected void saveJobInstance(JobDetail jd) throws SchedulerException,
IOException {
// If result is null, it may livy uri is wrong
// or livy parameter is wrong.
Map<String, Object> resultMap = post2LivyWithRetry();
String group = jd.getKey().getGroup();
String name = jd.getKey().getName();
batchJobOp.deleteJob(group, name);
LOGGER.info("Delete predicate job({},{}) SUCCESS.", group, name);
setJobInstance(resultMap, FOUND);
jobInstanceRepo.save(jobInstance);
}
private Map<String, Object> post2LivyWithRetry()
throws IOException {
String result = post2Livy();
Map<String, Object> resultMap = null;
if (result != null) {
resultMap = livyTaskSubmitHelper.retryLivyGetAppId(result, appIdRetryCount);
if (resultMap != null) {
livyTaskSubmitHelper.increaseCurTaskNum(Long.valueOf(
String.valueOf(resultMap.get("id"))).longValue());
}
}
return resultMap;
}
protected void saveJobInstance(String result, State state)
throws IOException {
TypeReference<HashMap<String, Object>> type =
new TypeReference<HashMap<String, Object>>() {
};
Map<String, Object> resultMap = null;
if (result != null) {
resultMap = toEntity(result, type);
}
setJobInstance(resultMap, state);
jobInstanceRepo.save(jobInstance);
}
private void setJobInstance(Map<String, Object> resultMap, State state) {
jobInstance.setState(state);
jobInstance.setPredicateDeleted(true);
if (resultMap != null) {
Object status = resultMap.get("state");
Object id = resultMap.get("id");
Object appId = resultMap.get("appId");
jobInstance.setState(status == null ? null : State.valueOf(status
.toString().toUpperCase()));
jobInstance.setSessionId(id == null ? null : Long.parseLong(id
.toString()));
jobInstance.setAppId(appId == null ? null : appId.toString());
}
}
}