blob: 0be0b25e45d93706e099a54b9c78e047c3fe1d71 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.rerun.handler;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.LateProcess;
import org.apache.falcon.entity.v0.process.PolicyType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.RerunPolicyFactory;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.Date;
/**
* An implementation of handler for late reruns.
*
* @param <M>
*/
public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
AbstractRerunHandler<LaterunEvent, M> {
private Thread daemon;
@Override
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public void handleRerun(String cluster, String entityType, String entityName, String nominalTime,
String runId, String wfId, String parentId, String workflowUser, long msgReceivedTime) {
try {
Entity entity = EntityUtil.getEntity(entityType, entityName);
int intRunId = Integer.parseInt(runId);
Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
Long wait = getEventDelay(entity, nominalTime);
if (wait == -1) {
LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName, entity.getACL().getOwner());
java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
String logDir = properties.getProperty("logDir");
String srcClusterName = properties.getProperty("srcClusterName");
Path lateLogPath = this.getLateLogPath(logDir,
EntityUtil.fromUTCtoURIDate(nominalTime), srcClusterName);
LOG.info("Going to delete path: {}", lateLogPath);
final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
Configuration conf = getConfiguration(storageEndpoint);
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
if (fs.exists(lateLogPath)) {
boolean deleted = fs.delete(lateLogPath, true);
if (deleted) {
LOG.info("Successfully deleted late file path: {}", lateLogPath);
}
}
return;
}
LOG.debug("Scheduling the late rerun for entity instance: {} ({}): {} And WorkflowId: {}",
entityType, entityName, nominalTime, wfId);
LaterunEvent event = new LaterunEvent(cluster, wfId, parentId, msgInsertTime.getTime(),
wait, entityType, entityName, nominalTime, intRunId, workflowUser);
offerToQueue(event);
} catch (Exception e) {
if (e instanceof EntityNotRegisteredException) {
LOG.warn("Entity {} of type {} doesn't exist in config store. So late rerun "
+ "cannot be done for workflow ", entityName,
entityType, wfId);
return;
}
LOG.error("Unable to schedule late rerun for entity instance: {} ({}): {} And WorkflowId: {}",
entityType, entityName, nominalTime, wfId, e);
GenericAlert.alertLateRerunFailed(entityType, entityName,
nominalTime, wfId, workflowUser, runId, e.getMessage());
}
}
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
LateProcess lateProcess = EntityUtil.getLateProcess(entity);
if (lateProcess == null) {
LOG.warn("Late run not applicable for entity: {} ({})", entity.getEntityType(), entity.getName());
return -1;
}
PolicyType latePolicy = lateProcess.getPolicy();
Date cutOffTime = getCutOffTime(entity, nominalTime);
Date now = new Date();
Long wait;
if (now.after(cutOffTime)) {
LOG.warn("Feed Cut Off time: {} has expired, Late Rerun can not be scheduled",
SchemaHelper.formatDateUTC(cutOffTime));
return -1;
} else {
AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
.getRetryPolicy(latePolicy);
wait = rerunPolicy.getDelay(lateProcess.getDelay(), instanceDate,
cutOffTime);
}
return wait;
}
public static Date addTime(Date date, long milliSecondsToAdd) {
return new Date(date.getTime() + milliSecondsToAdd);
}
public static Date getCutOffTime(Entity entity, String nominalTime) throws FalconException {
ExpressionHelper evaluator = ExpressionHelper.get();
Date instanceStart = EntityUtil.parseDateUTC(nominalTime);
ExpressionHelper.setReferenceDate(instanceStart);
Date endTime;
Date feedCutOff = new Date(0);
if (entity.getEntityType() == EntityType.FEED) {
if (((Feed) entity).getLateArrival() == null) {
LOG.debug("Feed's {} late arrival cut-off is not configured, returning", entity.getName());
return feedCutOff;
}
String lateCutOff = ((Feed) entity).getLateArrival().getCutOff()
.toString();
endTime = EntityUtil.parseDateUTC(nominalTime);
long feedCutOffPeriod = evaluator.evaluate(lateCutOff, Long.class);
endTime = addTime(endTime, feedCutOffPeriod);
return endTime;
} else if (entity.getEntityType() == EntityType.PROCESS) {
Process process = (Process) entity;
ConfigurationStore store = ConfigurationStore.get();
for (LateInput lp : process.getLateProcess().getLateInputs()) {
Feed feed = null;
String endInstanceTime = "";
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
if (input.getName().equals(lp.getInput())) {
endInstanceTime = input.getEnd();
feed = store.get(EntityType.FEED, input.getFeed());
break;
}
}
}
if (feed == null) {
throw new IllegalStateException("No such feed: " + lp.getInput());
}
if (feed.getLateArrival() == null) {
LOG.debug("Feed's {} late arrival cut-off is not configured, ignoring this feed", feed.getName());
continue;
}
String lateCutOff = feed.getLateArrival().getCutOff()
.toString();
endTime = evaluator.evaluate(endInstanceTime, Date.class);
long feedCutOffPeriod = evaluator.evaluate(lateCutOff,
Long.class);
endTime = addTime(endTime, feedCutOffPeriod);
if (endTime.after(feedCutOff)) {
feedCutOff = endTime;
}
}
return feedCutOff;
} else {
throw new FalconException("Invalid entity while getting cut-off time:" + entity.getName());
}
}
@Override
public void init(M aDelayQueue) throws FalconException {
super.init(aDelayQueue);
daemon = new Thread(new LateRerunConsumer(this));
daemon.setName("LaterunHandler");
daemon.setDaemon(true);
daemon.start();
LOG.info("Laterun Handler thread started");
}
@Override
public void close() throws FalconException {
daemon.interrupt();
super.close();
}
public Path getLateLogPath(String logDir, String nominalTime,
String srcClusterName) {
//SrcClusterName valid only in case of feed
return new Path(logDir + "/latedata/" + nominalTime + "/"
+ (srcClusterName == null
? "" : srcClusterName));
}
public static Configuration getConfiguration(String storageEndpoint) throws FalconException {
Configuration conf = new Configuration();
conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageEndpoint);
return conf;
}
@Override
public void onSuccess(WorkflowExecutionContext context) throws FalconException {
Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
//late data handling not applicable for feed retention action
if (context.getOperation() != WorkflowExecutionContext.EntityOperations.DELETE
&& EntityUtil.getLateProcess(entity) != null) {
handleRerun(context.getClusterName(), context.getEntityType(),
context.getEntityName(), context.getNominalTimeAsISO8601(),
context.getWorkflowRunIdString(), context.getWorkflowId(), context.getWorkflowParentId(),
context.getWorkflowUser(), context.getExecutionCompletionTime());
} else {
LOG.info("Late date handling not applicable for entityType: " + context.getEntityType()
+ ", entityName: " + context.getEntityName()
+ " operation: " + context.getOperation());
}
}
@Override
public void onFailure(WorkflowExecutionContext context) throws FalconException {
// do nothing since late data does not apply for failed workflows
}
@Override
public void onStart(WorkflowExecutionContext context) throws FalconException {
// do nothing
}
@Override
public void onSuspend(WorkflowExecutionContext context) throws FalconException {
// do nothing
}
@Override
public void onWait(WorkflowExecutionContext context) throws FalconException {
// Do nothing.
}
}