blob: ba7ef5d0ac02752556fbd99a6106d3b7dbf1b224 [file] [log] [blame]
package org.apache.helix.monitoring.mbeans;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.List;
import javax.management.JMException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
import org.apache.helix.task.TaskState;
import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
public class WorkflowMonitor extends DynamicMBeanProvider {
private static final String MBEAN_DESCRIPTION = "Workflow Monitor";
private static final String WORKFLOW_KEY = "Workflow";
static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
private String _clusterName;
private String _workflowType;
private SimpleDynamicMetric<Long> _successfulWorkflowCount;
private SimpleDynamicMetric<Long> _failedWorkflowCount;
private SimpleDynamicMetric<Long> _failedWorkflowGauge;
private SimpleDynamicMetric<Long> _existingWorkflowGauge;
private SimpleDynamicMetric<Long> _queuedWorkflowGauge;
private SimpleDynamicMetric<Long> _runningWorkflowGauge;
private SimpleDynamicMetric<Long> _totalWorkflowLatencyCount;
private SimpleDynamicMetric<Long> _maximumWorkflowLatencyGauge;
private SimpleDynamicMetric<Long> _lastResetTime;
public WorkflowMonitor(String clusterName, String workflowType) {
_clusterName = clusterName;
_workflowType = workflowType;
_successfulWorkflowCount = new SimpleDynamicMetric("SuccessfulWorkflowCount", 0L);
_failedWorkflowCount = new SimpleDynamicMetric("FailedWorkflowCount", 0L);
_failedWorkflowGauge = new SimpleDynamicMetric("FailedWorkflowGauge", 0L);
_existingWorkflowGauge = new SimpleDynamicMetric("ExistingWorkflowGauge", 0L);
_queuedWorkflowGauge = new SimpleDynamicMetric("QueuedWorkflowGauge", 0L);
_runningWorkflowGauge = new SimpleDynamicMetric("RunningWorkflowGauge", 0L);
_totalWorkflowLatencyCount = new SimpleDynamicMetric("TotalWorkflowLatencyCount", 0L);
_maximumWorkflowLatencyGauge = new SimpleDynamicMetric("MaximumWorkflowLatencyGauge", 0L);
_lastResetTime = new SimpleDynamicMetric("LastResetTime", System.currentTimeMillis());
}
@Override
public String getSensorName() {
return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType);
}
/**
* Update workflow with transition state
* @param to The to state of a workflow
*/
public void updateWorkflowCounters(TaskState to) {
updateWorkflowCounters(to, 0);
}
public void updateWorkflowCounters(TaskState to, long latency) {
if (to.equals(TaskState.FAILED)) {
incrementSimpleDynamicMetric(_failedWorkflowCount, 1);
} else if (to.equals(TaskState.COMPLETED)) {
incrementSimpleDynamicMetric(_successfulWorkflowCount, 1);
// Only record latency larger than 0 and succeeded workflows
incrementSimpleDynamicMetric(_maximumWorkflowLatencyGauge,
_maximumWorkflowLatencyGauge.getValue() > latency ? 0
: latency - _maximumWorkflowLatencyGauge.getValue());
incrementSimpleDynamicMetric(_totalWorkflowLatencyCount, latency > 0 ? latency : 0);
}
}
/**
* Reset gauges
*/
public void resetGauges() {
_failedWorkflowGauge.updateValue(0L);
_existingWorkflowGauge.updateValue(0L);
_runningWorkflowGauge.updateValue(0L);
_queuedWorkflowGauge.updateValue(0L);
if (_lastResetTime.getValue() + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
_lastResetTime.updateValue(System.currentTimeMillis());
_maximumWorkflowLatencyGauge.updateValue(0L);
}
}
/**
* Refresh gauges via transition state
* @param current current workflow state
*/
public void updateWorkflowGauges(TaskState current) {
if (current == null || current.equals(TaskState.NOT_STARTED)) {
incrementSimpleDynamicMetric(_queuedWorkflowGauge);
} else if (current.equals(TaskState.IN_PROGRESS)) {
incrementSimpleDynamicMetric(_runningWorkflowGauge);
} else if (current.equals(TaskState.FAILED)) {
incrementSimpleDynamicMetric(_failedWorkflowGauge);
}
incrementSimpleDynamicMetric(_existingWorkflowGauge);
}
// All the get functions are for testing purpose only.
public long getSuccessfulWorkflowCount() {
return _successfulWorkflowCount.getValue();
}
public long getFailedWorkflowCount() {
return _failedWorkflowCount.getValue();
}
public long getFailedWorkflowGauge() {
return _failedWorkflowGauge.getValue();
}
public long getExistingWorkflowGauge() {
return _existingWorkflowGauge.getValue();
}
public long getQueuedWorkflowGauge() {
return _queuedWorkflowGauge.getValue();
}
public long getRunningWorkflowGauge() {
return _runningWorkflowGauge.getValue();
}
@Override
public DynamicMBeanProvider register() throws JMException {
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
attributeList.add(_successfulWorkflowCount);
attributeList.add(_failedWorkflowCount);
attributeList.add(_failedWorkflowGauge);
attributeList.add(_existingWorkflowGauge);
attributeList.add(_queuedWorkflowGauge);
attributeList.add(_runningWorkflowGauge);
attributeList.add(_totalWorkflowLatencyCount);
attributeList.add(_maximumWorkflowLatencyGauge);
attributeList.add(_lastResetTime);
doRegister(attributeList, MBEAN_DESCRIPTION, getObjectName(_workflowType));
return this;
}
private ObjectName getObjectName(String workflowType) throws MalformedObjectNameException {
return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), String
.format("%s, %s=%s", String.format("%s=%s", CLUSTER_DN_KEY, _clusterName),
WORKFLOW_TYPE_DN_KEY, workflowType)));
}
}