blob: 16830f953ea05c7f305f4d91796bd640110442c5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.service;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Clusters;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.jdbc.BacklogMetricStore;
import org.apache.falcon.metrics.MetricNotificationService;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.MetricInfo;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine;
/**
* Backlog Metric Emitter Service to publish metrics to Graphite.
*/
public final class BacklogMetricEmitterService implements FalconService,
EntitySLAListener, WorkflowExecutionListener, ConfigurationChangeListener {
private static final String METRIC_PREFIX = StartupProperties.get().getProperty("falcon.graphite.prefix");
private static final String METRIC_SEPARATOR = ".";
private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs";
private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice."
+ "recheck.interval.millisecs";
private static final String DEFAULT_PIPELINE = "DEFAULT";
private static final Logger LOG = LoggerFactory.getLogger(BacklogMetricEmitterService.class);
private static BacklogMetricStore backlogMetricStore = new BacklogMetricStore();
private static final BacklogMetricEmitterService SERVICE = new BacklogMetricEmitterService();
private static MetricNotificationService metricNotificationService =
Services.get().getService(MetricNotificationService.SERVICE_NAME);
private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
public static BacklogMetricEmitterService get() {
return SERVICE;
}
private BacklogMetricEmitterService() {
}
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1);
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1);
public static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm'Z'");
format.setTimeZone(TimeZone.getTimeZone("UTC"));
return format;
}
};
private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();
@Override
public void onAdd(Entity entity) throws FalconException{
addToBacklog(entity);
}
@Override
public void onRemove(Entity entity) throws FalconException{
if (entity.getEntityType() != EntityType.PROCESS){
return;
}
Process process = (Process) entity;
if (process.getSla() != null) {
backlogMetricStore.deleteEntityInstance(entity.getName());
entityBacklogs.remove(entity);
process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
for (Cluster cluster : process.getClusters().getClusters()) {
dropMetric(cluster.getName(), process);
}
}
}
public void dropMetric(String clusterName, Process process){
String pipelinesStr = process.getPipelines();
String metricName;
if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
String[] pipelines = pipelinesStr.split(",");
for (String pipeline : pipelines) {
metricName = getMetricName(clusterName, process.getName(), pipeline);
metricNotificationService.deleteMetric(metricName);
}
} else {
metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE);
metricNotificationService.deleteMetric(metricName);
}
}
@Override
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
if (oldEntity.getEntityType() != EntityType.PROCESS){
return;
}
Process newProcess = (Process) newEntity;
Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
if (oldProcess.getSla() != null) {
backlogMetricStore.deleteEntityInstance(newProcess.getName());
entityBacklogs.remove(newProcess);
for (Cluster cluster : oldProcess.getClusters().getClusters()) {
dropMetric(cluster.getName(), oldProcess);
}
}
} else {
addToBacklog(newEntity);
}
}
@Override
public void onReload(Entity entity) throws FalconException{
addToBacklog(entity);
}
public void addToBacklog(Entity entity) {
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
if (process.getSla() == null){
return;
}
entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
}
@Override
public void highSLAMissed(String entityName, String clusterName, EntityType entityType,
Date nominalTime) throws FalconException {
if (entityType != EntityType.PROCESS) {
return;
}
Entity entity = EntityUtil.getEntity(entityType, entityName);
entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
List<MetricInfo> metricInfoList = entityBacklogs.get(entity);
String nominalTimeStr = DATE_FORMAT.get().format(nominalTime);
MetricInfo metricInfo = new MetricInfo(nominalTimeStr, clusterName);
if (!metricInfoList.contains(metricInfo)) {
synchronized (metricInfoList) {
backlogMetricStore.addInstance(entityName, clusterName, nominalTime, entityType);
metricInfoList.add(metricInfo);
}
}
}
@Override
public String getName() {
return this.getClass().getSimpleName();
}
@Override
public void init() throws FalconException {
initInstances();
int emitInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_EMIT_INTERVAL,
"60000"));
int recheckInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_RECHECK_INTERVAL,
"60000"));
scheduledThreadPoolExecutor1.scheduleAtFixedRate(new BacklogMetricEmitter(),
1, emitInterval, TimeUnit.MILLISECONDS);
scheduledThreadPoolExecutor2.scheduleAtFixedRate(new BacklogCheckService(),
1, recheckInterval, TimeUnit.MILLISECONDS);
}
private void initInstances() throws FalconException {
LOG.info("Initializing backlog instances from state store");
Map<Entity, List<MetricInfo>> backlogInstances = backlogMetricStore.getAllInstances();
if (backlogInstances != null && !backlogInstances.isEmpty()) {
for (Map.Entry<Entity, List<MetricInfo>> entry : backlogInstances.entrySet()) {
List<MetricInfo> metricsInDB = entry.getValue();
List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB);
entityBacklogs.put(entry.getKey(), metricInfoList);
LOG.debug("Initializing backlog for entity " + entry.getKey().getName());
}
}
}
@Override
public void destroy() throws FalconException {
scheduledThreadPoolExecutor1.shutdown();
scheduledThreadPoolExecutor2.shutdown();
}
@Override
public synchronized void onSuccess(WorkflowExecutionContext context) throws FalconException {
Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
if (entityBacklogs.containsKey(entity)) {
List<MetricInfo> metrics = entityBacklogs.get(entity);
synchronized (metrics) {
Date date = SchemaHelper.parseDateUTC(context.getNominalTimeAsISO8601());
backlogMetricStore.deleteMetricInstance(entity.getName(), context.getClusterName(),
date, entity.getEntityType());
metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName()));
if (metrics.isEmpty()) {
entityBacklogs.remove(entity);
publishBacklog((Process) entity, context.getClusterName(), 0L);
}
}
}
}
@Override
public void onFailure(WorkflowExecutionContext context) throws FalconException {
// Do Nothing
}
@Override
public void onStart(WorkflowExecutionContext context) throws FalconException {
// Do Nothing
}
@Override
public void onSuspend(WorkflowExecutionContext context) throws FalconException {
// Do Nothing
}
@Override
public void onWait(WorkflowExecutionContext context) throws FalconException {
// Do Nothing
}
/**
* Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
*/
public static class BacklogMetricEmitter implements Runnable {
private ThreadPoolExecutor executor;
@Override
public void run() {
LOG.debug("BacklogMetricEmitter running for entities");
executor = new ScheduledThreadPoolExecutor(10);
List<Future> futures = new ArrayList<>();
try {
for (Entity entity : entityBacklogs.keySet()) {
futures.add(executor.submit(new BacklogCalcService(entity, entityBacklogs.get(entity))));
}
waitForFuturesToComplete(futures);
} finally {
executor.shutdown();
}
}
private void waitForFuturesToComplete(List<Future> futures) {
try {
for (Future future : futures) {
future.get();
}
} catch (InterruptedException e) {
LOG.error("Interruption while executing tasks " + e);
} catch (ExecutionException e) {
LOG.error("Error in executing threads " + e);
}
}
}
/**
* Service which calculates backlog for given entity and publish to graphite.
*/
public static class BacklogCalcService implements Runnable {
private Entity entityObj;
private List<MetricInfo> metrics;
BacklogCalcService(Entity entity, List<MetricInfo> metricInfoList) {
this.entityObj = entity;
this.metrics = metricInfoList;
}
@Override
public void run() {
MetricInfo metricInfo = null;
HashMap<String, Long> backLogsCluster = new HashMap<>();
synchronized (metrics) {
if (metrics.isEmpty()){
Process process = (Process)entityObj;
Clusters clusters = process.getClusters();
for (Cluster cluster : clusters.getClusters()){
publishBacklog(process, cluster.getName(), 0L);
}
}else{
long currentTime = System.currentTimeMillis();
Iterator iter = metrics.iterator();
while (iter.hasNext()) {
try {
metricInfo = (MetricInfo) iter.next();
long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
? backLogsCluster.get(metricInfo.getCluster()) : 0;
backlog += (currentTime - time);
backLogsCluster.put(metricInfo.getCluster(), backlog);
} catch (ParseException e) {
LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime());
}
}
}
}
org.apache.falcon.entity.v0.process.Process process = (Process) entityObj;
if (backLogsCluster != null && !backLogsCluster.isEmpty()) {
for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) {
String clusterName = entry.getKey();
Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes
publishBacklog(process, clusterName, backlog);
}
}
}
}
public static void publishBacklog(Process process, String clusterName, Long backlog){
String pipelinesStr = process.getPipelines();
String metricName;
if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
String[] pipelines = pipelinesStr.split(",");
for (String pipeline : pipelines) {
metricName = getMetricName(clusterName, process.getName(), pipeline);
metricNotificationService.publish(metricName, backlog);
}
} else {
metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE);
metricNotificationService.publish(metricName, backlog);
}
}
public static String getMetricName(String clusterName, String processName, String pipeline){
String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+ METRIC_SEPARATOR + processName + METRIC_SEPARATOR
+ "backlogInMins";
return metricName;
}
/**
* Service runs periodically and removes succeeded instances from backlog list.
*/
public static class BacklogCheckService implements Runnable {
@Override
public void run() {
LOG.trace("BacklogCheckService running for entities");
try {
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
for (Entity entity : entityBacklogs.keySet()) {
List<MetricInfo> metrics = entityBacklogs.get(entity);
if (!metrics.isEmpty()) {
synchronized (metrics) {
Iterator iterator = metrics.iterator();
while (iterator.hasNext()) {
MetricInfo metricInfo = (MetricInfo) iterator.next();
String nominalTimeStr = metricInfo.getNominalTime();
Date nominalTime;
try {
nominalTime = DATE_FORMAT.get().parse(nominalTimeStr);
authenticateUser(entity);
if (wfEngine.isMissing(entity)) {
LOG.info("Entity of name {} was deleted so removing instance of "
+ "nominaltime {} ", entity.getName(), nominalTimeStr);
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
iterator.remove();
continue;
}
InstancesResult status = wfEngine.getStatus(entity, nominalTime,
new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false);
if (status.getInstances().length > 0
&& status.getInstances()[0].status == InstancesResult.
WorkflowStatus.SUCCEEDED) {
LOG.debug("Instance of nominaltime {} of entity {} has succeeded, removing "
+ "from backlog entries", nominalTimeStr, entity.getName());
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
iterator.remove();
}
} catch (ParseException e) {
LOG.error("Unable to parse date " + nominalTimeStr);
}
}
}
}
}
} catch (Throwable e) {
LOG.error("Error while checking backlog metrics" + e);
}
}
}
private static void authenticateUser(Entity entity){
if (!CurrentUser.isAuthenticated()) {
if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
CurrentUser.authenticate(entity.getACL().getOwner());
} else {
CurrentUser.authenticate(System.getProperty("user.name"));
}
}
}
}